You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/20 02:19:46 UTC

[1/3] git commit: PHOENIX-1349 VIEWs do not inherit indexes from their parent

Repository: phoenix
Updated Branches:
  refs/heads/4.0 8ba26edb4 -> 90dfe8160


PHOENIX-1349 VIEWs do not inherit indexes from their parent


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b90f5187
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b90f5187
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b90f5187

Branch: refs/heads/4.0
Commit: b90f51876ffa957cdcdb3a5d88adde70c2da9f36
Parents: 8ba26ed
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Oct 18 22:19:26 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Oct 19 17:23:43 2014 -0700

----------------------------------------------------------------------
 .../apache/phoenix/compile/FromCompiler.java    |   4 +-
 .../apache/phoenix/compile/JoinCompiler.java    |  10 +-
 .../coprocessor/MetaDataEndpointImpl.java       |   6 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |   6 +-
 .../apache/phoenix/optimize/QueryOptimizer.java |   2 +-
 .../phoenix/query/ConnectionQueryServices.java  |   3 +-
 .../query/ConnectionQueryServicesImpl.java      |  60 ++++++--
 .../query/ConnectionlessQueryServicesImpl.java  |   6 +-
 .../query/DelegateConnectionQueryServices.java  |   8 +-
 .../apache/phoenix/schema/DelegateTable.java    |   5 +
 .../apache/phoenix/schema/MetaDataClient.java   | 145 ++++++++++++-------
 .../java/org/apache/phoenix/schema/PTable.java  |   6 +
 .../org/apache/phoenix/schema/PTableImpl.java   |  85 ++++++-----
 .../phoenix/compile/QueryOptimizerTest.java     |   4 -
 14 files changed, 212 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 6f7b006..0fed42a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -441,8 +441,8 @@ public class FromCompiler {
             }
             PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, 
                     PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, 
-                    null, null, columns, null, Collections.<PTable>emptyList(), false, 
-                    Collections.<PName>emptyList(), null, null, false, false, null, null, null);
+                    null, null, columns, null, null, Collections.<PTable>emptyList(), 
+                    false, Collections.<PName>emptyList(), null, null, false, false, null, null, null);
             
             String alias = subselectNode.getAlias();
             TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index b6a43b7..ef053de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -744,8 +744,8 @@ public class JoinCompiler {
             
             PTable t = PTableImpl.makePTable(table.getTenantId(), PNameFactory.newName(PROJECTED_TABLE_SCHEMA), table.getName(), PTableType.JOIN,
                         table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
-                        retainPKColumns ? table.getBucketNum() : null, projectedColumns, table.getParentTableName(),
-                        table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType());
+                        retainPKColumns ? table.getBucketNum() : null, projectedColumns, table.getParentSchemaName(),
+                        table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType());
             return new ProjectedPTableWrapper(t, columnNameMap, sourceExpressions);
         }
         
@@ -794,8 +794,8 @@ public class JoinCompiler {
             }
             PTable t = PTableImpl.makePTable(table.getTenantId(), PNameFactory.newName(PROJECTED_TABLE_SCHEMA), table.getName(), PTableType.JOIN,
                         table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
-                        null, projectedColumns, table.getParentTableName(),
-                        table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType());
+                        null, projectedColumns, table.getParentSchemaName(),
+                        table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType());
             return new ProjectedPTableWrapper(t, columnNameMap, sourceExpressions);
         }
     }
@@ -1300,7 +1300,7 @@ public class JoinCompiler {
             }
             PTable t = PTableImpl.makePTable(left.getTenantId(), left.getSchemaName(),
                     PNameFactory.newName(SchemaUtil.getTableName(left.getName().getString(), right.getName().getString())), left.getType(), left.getIndexState(), left.getTimeStamp(), left.getSequenceNumber(), left.getPKName(), left.getBucketNum(), merged,
-                    left.getParentTableName(), left.getIndexes(), left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, left.isMultiTenant(), left.getViewType(), left.getViewIndexId(), left.getIndexType());
+                    left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(), left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, left.isMultiTenant(), left.getViewType(), left.getViewIndexId(), left.getIndexType());
 
             ListMultimap<String, String> mergedMap = ArrayListMultimap.<String, String>create();
             mergedMap.putAll(this.getColumnNameMap());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index c0d29bf..047a947 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -617,9 +617,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             }
         }
         return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, 
-            tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? dataTableName : null, 
-            indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, 
-            multiTenant, viewType, viewIndexId, indexType, stats);
+            tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null, 
+            tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, 
+            disableWAL, multiTenant, viewType, viewIndexId, indexType, stats);
     }
 
     private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 9a3e399..03c643d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -199,13 +199,13 @@ public abstract class BaseQueryPlan implements QueryPlan {
                 KeyValueSchema schema = ProjectedColumnExpression.buildSchema(dataColumns);
                 // Set key value schema of the data columns.
                 serializeSchemaIntoScan(scan, schema);
-                String schemaName = context.getCurrentTable().getTable().getSchemaName().getString();
+                String parentSchema = context.getCurrentTable().getTable().getParentSchemaName().getString();
                 String parentTable = context.getCurrentTable().getTable().getParentTableName().getString();
                 final ParseNodeFactory FACTORY = new ParseNodeFactory();
                 TableRef dataTableRef =
                         FromCompiler.getResolver(
-                            FACTORY.namedTable(null, TableName.create(schemaName, parentTable)),
-                            context.getConnection()).resolveTable(schemaName, parentTable);
+                            FACTORY.namedTable(null, TableName.create(parentSchema, parentTable)),
+                            context.getConnection()).resolveTable(parentSchema, parentTable);
                 PTable dataTable = dataTableRef.getTable();
                 // Set index maintainer of the local index.
                 serializeIndexMaintainerIntoScan(scan, dataTable);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 6b74822..6a68df3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -209,7 +209,7 @@ public class QueryOptimizer {
     private static QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan) throws SQLException {
         int nColumns = dataPlan.getProjector().getColumnCount();
         String alias = '"' + dataPlan.getTableRef().getTableAlias() + '"'; // double quote in case it's case sensitive
-        String schemaName = dataPlan.getTableRef().getTable().getSchemaName().getString();
+        String schemaName = index.getParentSchemaName().getString();
         schemaName = schemaName.length() == 0 ? null :  '"' + schemaName + '"';
 
         String tableName = '"' + index.getTableName().getString() + '"';

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index c017b77..8af9310 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -106,8 +106,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
     public String getUserName();
     public void incrementTableTimeStamp(final byte[] tenantId, final byte[] schemaName, final byte[] tableName, long clientTS) throws SQLException;
 
-    public PTableStats getTableStats(String physicalName);
-    public void addTableStats(String physicalName, PTableStats tableStats);
+    public PTableStats getTableStats(byte[] physicalName, long clientTimeStamp) throws SQLException;
     
     public void clearCache() throws SQLException;
     public int getSequenceSaltBuckets();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 415fbca..d46497d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -32,6 +32,7 @@ import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.concurrent.GuardedBy;
@@ -97,6 +98,7 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -123,6 +125,7 @@ import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.ConfigUtil;
@@ -159,7 +162,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final ReadOnlyProps props;
     private final String userName;
     private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
-    private final Cache<String, PTableStats> tableStatsCache;
+    private final Cache<ImmutableBytesPtr, PTableStats> tableStatsCache;
     
     // Cache the latest meta data here for future connections
     // writes guarded by "latestMetaDataLock"
@@ -1015,7 +1018,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             try {
                 desc = admin.getTableDescriptor(physicalIndexName);
                 if (Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES)))) {
-                    this.tableStatsCache.invalidate(Bytes.toString(physicalIndexName));
+                    this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalIndexName));
                     final ReadOnlyProps props = this.getProps();
                     final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
                     if (dropMetadata) {
@@ -1050,7 +1053,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             try {
                 desc = admin.getTableDescriptor(physicalIndexName);
                 if (Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
-                    this.tableStatsCache.invalidate(Bytes.toString(physicalIndexName));
+                    this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalIndexName));
                     final ReadOnlyProps props = this.getProps();
                     final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
                     if (dropMetadata) {
@@ -1219,11 +1222,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             }
             invalidateTables(result.getTableNamesToDelete());
             if (tableType == PTableType.TABLE) {
-                byte[] physicalTableName = SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
+                byte[] physicalName = SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
                 long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
-                ensureViewIndexTableDropped(physicalTableName, timestamp);
-                ensureLocalIndexTableDropped(physicalTableName, timestamp);
-                tableStatsCache.invalidate(SchemaUtil.getTableName(schemaBytes, tableBytes));
+                ensureViewIndexTableDropped(physicalName, timestamp);
+                ensureLocalIndexTableDropped(physicalName, timestamp);
+                tableStatsCache.invalidate(new ImmutableBytesPtr(physicalName));
             }
             break;
         default:
@@ -1235,7 +1238,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private void invalidateTables(final List<byte[]> tableNamesToDelete) {
         if (tableNamesToDelete != null) {
             for ( byte[] tableName : tableNamesToDelete ) {
-                tableStatsCache.invalidate(Bytes.toString(tableName));
+                tableStatsCache.invalidate(new ImmutableBytesPtr(tableName));
             }
         }
     }
@@ -1913,7 +1916,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 sqlE = new SQLException(e);
             } finally {
                 try {
-                    if (tenantId.length == 0) tableStatsCache.invalidate(SchemaUtil.getTableName(schemaName, tableName));
+                    if (tenantId.length == 0) tableStatsCache.invalidate(new ImmutableBytesPtr(SchemaUtil.getTableNameAsBytes(schemaName, tableName)));
                     htable.close();
                 } catch (IOException e) {
                     if (sqlE == null) {
@@ -2096,15 +2099,42 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private void throwConnectionClosedException() {
         throw new IllegalStateException("Connection to the cluster is closed");
     }
-    @Override
-    public PTableStats getTableStats(String physicalName) {
-        return tableStatsCache.getIfPresent(physicalName);
-    }
     
     @Override
-    public void addTableStats(String physicalName, PTableStats tableStats) {
-        tableStatsCache.put(physicalName, tableStats);
+    public PTableStats getTableStats(final byte[] physicalName, final long clientTimeStamp) throws SQLException {
+        try {
+            return tableStatsCache.get(new ImmutableBytesPtr(physicalName), new Callable<PTableStats>() {
+                @Override
+                public PTableStats call() throws Exception {
+                    /*
+                     *  The shared view index case is tricky, because we don't have
+                     *  table metadata for it, only an HBase table. We do have stats,
+                     *  though, so we'll query them directly here and cache them so
+                     *  we don't keep querying for them.
+                     */
+                    HTableInterface statsHTable = ConnectionQueryServicesImpl.this.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
+                    try {
+                        return StatisticsUtil.readStatistics(statsHTable, physicalName, clientTimeStamp);
+                    } catch (IOException e) {
+                        logger.warn("Unable to read from stats table", e);
+                        // Just cache empty stats. We'll try again after some time anyway.
+                        return PTableStats.EMPTY_STATS;
+                    } finally {
+                        try {
+                            statsHTable.close();
+                        } catch (IOException e) {
+                            // Log, but continue. We have our stats anyway now.
+                            logger.warn("Unable to close stats table", e);
+                        }
+                    }
+                }
+                
+            });
+        } catch (ExecutionException e) {
+            throw ServerUtil.parseServerException(e);
+        }
     }
+    
     @Override
     public int getSequenceSaltBuckets() {
         return nSequenceSaltBuckets;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 9bd30a3..b04d29d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -427,15 +427,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     }
 
     @Override
-    public PTableStats getTableStats(String physicalName) {
+    public PTableStats getTableStats(byte[] physicalName, long clientTimeStamp) {
         return PTableStats.EMPTY_STATS;
     }
 
     @Override
-    public void addTableStats(String physicalName, PTableStats tableStats) {
-    }
-
-    @Override
     public void clearCache() throws SQLException {
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 34bca4d..bb4bb33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -235,14 +235,10 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     }
 
     @Override
-    public PTableStats getTableStats(String physicalName) {
-        return getDelegate().getTableStats(physicalName);
+    public PTableStats getTableStats(byte[] physicalName, long clientTimeStamp) throws SQLException {
+        return getDelegate().getTableStats(physicalName, clientTimeStamp);
     }
 
-    @Override
-    public void addTableStats(String physicalName, PTableStats tableStats) {
-        getDelegate().addTableStats(physicalName, tableStats);
-    }
 
     @Override
     public void clearCache() throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index f4bf1cc..b7ab899 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -225,4 +225,9 @@ public class DelegateTable implements PTable {
     public DelegateTable(PTable delegate) {
         this.delegate = delegate;
     }
+
+    @Override
+    public PName getParentSchemaName() {
+        return delegate.getParentSchemaName();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 12a11bc..924906e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -88,7 +88,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
@@ -137,7 +136,6 @@ import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.stats.PTableStats;
-import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
@@ -352,6 +350,7 @@ public class MetaDataClient {
                 // Otherwise, a tenant would be required to create a VIEW first
                 // which is not really necessary unless you want to filter or add
                 // columns
+                addIndexesFromPhysicalTable(result);
                 connection.addTable(resultTable);
                 return result;
             } else {
@@ -365,6 +364,7 @@ public class MetaDataClient {
                 if (table != null) {
                     result.setTable(table);
                     if (code == MutationCode.TABLE_ALREADY_EXISTS) {
+                        addIndexesFromPhysicalTable(result);
                         return result;
                     }
                     if (code == MutationCode.TABLE_NOT_FOUND && tryCount + 1 == maxTryCount) {
@@ -377,6 +377,60 @@ public class MetaDataClient {
         
         return result;
     }
+    
+    /**
+     * Fault in the physical table to the cache and add any indexes it has to the indexes
+     * of the table for which we just updated.
+     * TODO: combine this round trip with the one that updates the cache for the child table.
+     * @param result the result from updating the cache for the current table.
+     * @return true if the PTable contained by result was modified and false otherwise
+     * @throws SQLException if the physical table cannot be found
+     */
+    private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result) throws SQLException {
+        PTable table = result.getTable();
+        // If not a view or if a view directly over an HBase table, there's nothing to do
+        if (table.getType() != PTableType.VIEW || table.getViewType() == ViewType.MAPPED) {
+            return false;
+        }
+        String physicalName = table.getPhysicalName().getString();
+        String schemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
+        String tableName = SchemaUtil.getTableNameFromFullName(physicalName);
+        MetaDataMutationResult parentResult = updateCache(null, schemaName, tableName, false);
+        PTable physicalTable = parentResult.getTable();
+        if (physicalTable == null) {
+            throw new TableNotFoundException(schemaName, tableName);
+        }
+        if (!result.wasUpdated() && !parentResult.wasUpdated()) {
+            return false;
+        }
+        List<PTable> indexes = physicalTable.getIndexes();
+        if (indexes.isEmpty()) {
+            return false;
+        }
+        // Filter out indexes if column doesn't exist in view
+        List<PTable> allIndexes = Lists.newArrayListWithExpectedSize(indexes.size() + table.getIndexes().size());
+        if (result.wasUpdated()) { // Table from server never contains inherited indexes
+            allIndexes.addAll(table.getIndexes());
+        } else { // Only add original ones, as inherited ones may have changed
+            for (PTable index : indexes) {
+                if (index.getViewIndexId() != null) {
+                    allIndexes.add(index);
+                }
+            }
+        }
+        for (PTable index : indexes) {
+            for (PColumn pkColumn : index.getPKColumns()) {
+                try {
+                    IndexUtil.getDataColumn(table, pkColumn.getName().getString());
+                    allIndexes.add(index);
+                } catch (IllegalArgumentException e) { // Ignore, and continue, as column was not found
+                }
+            }
+        }
+        PTable allIndexesTable = PTableImpl.makePTable(table, table.getTimeStamp(), allIndexes);
+        result.setTable(allIndexesTable);
+        return true;
+    }
 
 
     private void addColumnMutation(String schemaName, String tableName, PColumn column, PreparedStatement colUpsert, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
@@ -1325,11 +1379,12 @@ public class MetaDataClient {
             // Bootstrapping for our SYSTEM.TABLE that creates itself before it exists 
             if (SchemaUtil.isMetaTable(schemaName,tableName)) {
                 // TODO: what about stats for system catalog?
-                PTable table = PTableImpl.makePTable(tenantId,PNameFactory.newName(schemaName), PNameFactory.newName(tableName), tableType,
+                PName newSchemaName = PNameFactory.newName(schemaName);
+                PTable table = PTableImpl.makePTable(tenantId,newSchemaName, PNameFactory.newName(tableName), tableType,
                         null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
-                        PNameFactory.newName(QueryConstants.SYSTEM_TABLE_PK_NAME), null, columns, null, Collections.<PTable>emptyList(), 
-                        isImmutableRows, Collections.<PName>emptyList(),
-                        defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), null, Boolean.TRUE.equals(disableWAL), false, null, indexId, indexType);
+                        PNameFactory.newName(QueryConstants.SYSTEM_TABLE_PK_NAME), null, columns, null, null, 
+                        Collections.<PTable>emptyList(), isImmutableRows,
+                        Collections.<PName>emptyList(), defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), null, Boolean.TRUE.equals(disableWAL), false, null, indexId, indexType);
                 connection.addTable(table);
             } else if (tableType == PTableType.INDEX && indexId == null) {
                 if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
@@ -1471,11 +1526,12 @@ public class MetaDataClient {
                 connection.addTable(result.getTable());
                 throw new ConcurrentTableMutationException(schemaName, tableName);
             default:
+                PName newSchemaName = PNameFactory.newName(schemaName);
                 PTable table =  PTableImpl.makePTable(
-                        tenantId, PNameFactory.newName(schemaName), PNameFactory.newName(tableName), tableType, indexState, result.getMutationTime(), 
+                        tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, result.getMutationTime(), 
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns, 
-                        dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows, physicalNames,
-                        defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, viewType, indexId, indexType);
+                        dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows,
+                        physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, viewType, indexId, indexType);
                 connection.addTable(table);
                 return table;
             }
@@ -2374,57 +2430,36 @@ public class MetaDataClient {
     }
     
     public PTableStats getTableStats(PTable table) throws SQLException {
-        boolean isView = table.getType() == PTableType.VIEW;
+        /*
+         *  The shared view index case is tricky, because we don't have
+         *  table meta data for it, only an HBase table. We do have stats,
+         *  though, so we'll query them directly here and cache them so
+         *  we don't keep querying for them.
+         */
         boolean isSharedIndex = table.getViewIndexId() != null;
-        if (!isView && !isSharedIndex) {
-            return table.getTableStats();
+        if (isSharedIndex) {
+            return connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), getClientTimeStamp());
         }
+        boolean isView = table.getType() == PTableType.VIEW;
         String physicalName = table.getPhysicalName().getString();
-        // If we have a VIEW or a local or view INDEX, check our cache rather
-        // than updating the cache for that table to prevent an extra roundtrip.
-        PTableStats tableStats = connection.getQueryServices().getTableStats(physicalName);
-        if (tableStats != null) {
-            return tableStats;
-        }
-        if (isView) {
-            String physicalSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
-            String physicalTableName = SchemaUtil.getTableNameFromFullName(physicalName);
-            MetaDataMutationResult result = updateCache(null, /* use global tenant id to get physical table */
-                    physicalSchemaName, physicalTableName);
-            PTable physicalTable = result.getTable();
-            if(physicalTable == null) {
-                // We should be able to find the physical table, as we found the logical one
-                // Might mean the physical table as just deleted.
-                logger.warn("Unable to retrieve physical table " + physicalName + " for table " + table.getName().getString());
-                throw new TableNotFoundException(table.getSchemaName().getString(),table.getTableName().getString());
-            }
-            tableStats = physicalTable.getTableStats();
-        } else {
-            /*
-             *  Otherwise, we have a shared view. This case is tricky, because we don't have
-             *  table metadata for it, only an HBase table. We do have stats, though, so we'll
-             *  query them directly here and cache them so we don't keep querying for them.
-             */
-            HTableInterface statsHTable = connection.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
+        if (isView && table.getViewType() != ViewType.MAPPED) {
             try {
-                long clientTimeStamp = getClientTimeStamp();
-                tableStats = StatisticsUtil.readStatistics(statsHTable, table.getPhysicalName().getBytes(), clientTimeStamp);
-            } catch (IOException e) {
-                logger.warn("Unable to read from stats table", e);
-                // Just cache empty stats. We'll try again after some time anyway.
-                tableStats = PTableStats.EMPTY_STATS;
-            } finally {
-                try {
-                    statsHTable.close();
-                } catch (IOException e) {
-                    // Log, but continue. We have our stats anyway now.
-                    logger.warn("Unable to close stats table", e);
-                }
+                return connection.getMetaDataCache().getTable(new PTableKey(null, physicalName)).getTableStats();
+            } catch (TableNotFoundException e) {
+                // Possible when the table timestamp == current timestamp - 1.
+                // This would be most likely during the initial index build of a view index
+                // where we're doing an upsert select from the tenant specific table.
+                // TODO: would we want to always load the physical table in updateCache in
+                // this case too, as we might not update the view with all of it's indexes?
+                String physicalSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
+                String physicalTableName = SchemaUtil.getTableNameFromFullName(physicalName);
+                MetaDataMutationResult result = updateCache(null, physicalSchemaName, physicalTableName, false);
+                if (result.getTable() == null) {
+                    throw new TableNotFoundException(physicalSchemaName, physicalTableName);
+                }
+                return result.getTable().getTableStats();
             }
         }
-        // Cache these stats so that we don't keep making a roundrip just to get the stats (as
-        // they don't change very often.
-        connection.getQueryServices().addTableStats(physicalName, tableStats);
-        return tableStats;
+        return table.getTableStats();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 4193200..d59e9c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -285,6 +285,12 @@ public interface PTable {
      * on or null if not an index.
      */
     PName getParentTableName();
+    /**
+     * Gets the schema name of the data table for an index table.
+     * @return the schema name of the data table that this index is
+     * on or null if not an index.
+     */
+    PName getParentSchemaName();
     
     /**
      * For a view, return the name of table in Phoenix that physically stores data.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 8f88e51..2448f39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -103,6 +103,7 @@ public class PTableImpl implements PTable {
     private List<PTable> indexes;
     // Data table name that the index is created on.
     private PName parentName;
+    private PName parentSchemaName;
     private PName parentTableName;
     private List<PName> physicalNames;
     private boolean isImmutableRows;
@@ -173,74 +174,78 @@ public class PTableImpl implements PTable {
     }
     
     public static PTableImpl makePTable(PTable table, long timeStamp, List<PTable> indexes) throws SQLException {
+        return makePTable(table, timeStamp, indexes, table.getSchemaName());
+    }
+
+    public static PTableImpl makePTable(PTable table, long timeStamp, List<PTable> indexes, PName parentSchemaName) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, 
-                table.getSequenceNumber() + 1, table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentTableName(), indexes,
-                table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
-                table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+                table.getSequenceNumber() + 1, table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(),
+                indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
+                table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
     }
 
     public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), 
-                table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(),
-                table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
-                table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+                table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
+                table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
+                table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, 
-                sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(), table.isImmutableRows(),
-                table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(),
-                table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+                sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
+                table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
+                table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, 
-                sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(),
-                isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
-                table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+                sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
+                table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
+                table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
     }
 
     public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), state, table.getTimeStamp(), 
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), 
-                table.getParentTableName(), table.getIndexes(), table.isImmutableRows(),
-                table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), 
-                table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+                table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
+                table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), 
+                table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
-            PName pkName, Integer bucketNum, List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows,
-            List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType,
-            Short viewIndexId, IndexType indexType) throws SQLException {
-        return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataTableName,
-                indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType,
-                PTableStats.EMPTY_STATS);
+            PName pkName, Integer bucketNum, List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
+            boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
+            ViewType viewType, Short viewIndexId, IndexType indexType) throws SQLException {
+        return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
+                dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId,
+                indexType, PTableStats.EMPTY_STATS);
     }
     
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
             PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum,
-            List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows,
-            List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
-            boolean multiTenant, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats)
+            List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
+            boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
+            boolean disableWAL, boolean multiTenant, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats)
             throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
-                bucketNum, columns, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
-                viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType, stats);
+                bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
+                defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType, stats);
     }
 
     private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
             long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum, List<PColumn> columns,
-            PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames,
-            PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType,
-            Short viewIndexId, IndexType indexType, PTableStats stats) throws SQLException {
+            PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
+            List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
+            ViewType viewType, Short viewIndexId, IndexType indexType, PTableStats stats) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
-                stats, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression,
-                disableWAL, multiTenant, viewType, viewIndexId, indexType);
+                stats, schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
+                viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType);
     }
 
     @Override
@@ -260,9 +265,9 @@ public class PTableImpl implements PTable {
     }
     
     private void init(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
-            PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentTableName, List<PTable> indexes,
-            boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
-            ViewType viewType, Short viewIndexId, IndexType indexType ) throws SQLException {
+            PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentSchemaName, PName parentTableName,
+            List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
+            boolean multiTenant, ViewType viewType, Short viewIndexId, IndexType indexType ) throws SQLException {
         Preconditions.checkNotNull(schemaName);
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
         int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -379,9 +384,10 @@ public class PTableImpl implements PTable {
             estimatedSize += index.getEstimatedSize();
         }
         
+        this.parentSchemaName = parentSchemaName;
         this.parentTableName = parentTableName;
         this.parentName = parentTableName == null ? null : PNameFactory.newName(SchemaUtil.getTableName(
-                schemaName.getString(), parentTableName.getString()));
+                parentSchemaName.getString(), parentTableName.getString()));
         estimatedSize += PNameFactory.getEstimatedSize(this.parentName);
         
         this.physicalNames = physicalNames == null ? ImmutableList.<PName>of() : ImmutableList.copyOf(physicalNames);
@@ -911,8 +917,8 @@ public class PTableImpl implements PTable {
       try {
         PTableImpl result = new PTableImpl();
         result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
-          (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, dataTableName,indexes, isImmutableRows, 
-              physicalNames, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType, viewIndexId, indexType);
+          (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, schemaName,dataTableName, indexes, 
+              isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType, viewIndexId, indexType);
         return result;
       } catch (SQLException e) {
         throw new RuntimeException(e); // Impossible
@@ -1005,5 +1011,10 @@ public class PTableImpl implements PTable {
     public PTableStats getTableStats() {
         return tableStats;
     }
+
+    @Override
+    public PName getParentSchemaName() {
+        return parentSchemaName;
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
index 81f4a45..c046bc6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
@@ -39,7 +39,6 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
@@ -336,7 +335,6 @@ public class QueryOptimizerTest extends BaseConnectionlessQueryTest {
     }
     
     @Test
-    @Ignore // FIXME : https://issues.apache.org/jira/browse/PHOENIX-1302
     // Multi-tenant = true; Query uses index = true; Salted = false
     public void testAssertQueryPlanDetails3() throws Exception {
         testAssertQueryPlanDetails(true, true, true);
@@ -361,7 +359,6 @@ public class QueryOptimizerTest extends BaseConnectionlessQueryTest {
     }
     
     @Test
-    @Ignore // FIXME : https://issues.apache.org/jira/browse/PHOENIX-1302
     // Multi-tenant = true; Query uses index = true; Salted = false
     public void testAssertQueryPlanDetails7() throws Exception {
         testAssertQueryPlanDetails(true, true, false);
@@ -479,7 +476,6 @@ public class QueryOptimizerTest extends BaseConnectionlessQueryTest {
     }
     
     @Test
-    @Ignore // FIXME : https://issues.apache.org/jira/browse/PHOENIX-1302
     public void testAssertQueryAgainstTenantSpecificViewGoesThroughIndex() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl(), new Properties());
         


[2/3] git commit: PHOENIX-1271 hide tenant column in tenant-specific connections (Bruno Dumon)

Posted by ja...@apache.org.
PHOENIX-1271 hide tenant column in tenant-specific
 connections (Bruno Dumon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bed96f15
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bed96f15
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bed96f15

Branch: refs/heads/4.0
Commit: bed96f15d80d4565dd116b57a2677bb8f3b65a31
Parents: b90f518
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 19 15:47:39 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Oct 19 17:24:11 2014 -0700

----------------------------------------------------------------------
 .../end2end/TenantSpecificTablesDDLIT.java      |  6 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   | 86 +++++++++++++++++++-
 2 files changed, 84 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/bed96f15/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index a9fa412..589e963 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -556,8 +556,7 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
             assertTrue(rs.next());
             assertColumnMetaData(rs, null, TENANT_TABLE_NAME, "user");
             assertTrue(rs.next());
-            assertColumnMetaData(rs, null, TENANT_TABLE_NAME, "tenant_id");
-            assertTrue(rs.next());
+            // (tenant_id column is not visible in tenant-specific connection)
             assertColumnMetaData(rs, null, TENANT_TABLE_NAME, "tenant_type_id");
             assertTrue(rs.next());
             assertColumnMetaData(rs, null, TENANT_TABLE_NAME, "id");
@@ -566,8 +565,7 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
             assertTrue(rs.next());
             assertColumnMetaData(rs, null, TENANT_TABLE_NAME_NO_TENANT_TYPE_ID, "user");
             assertTrue(rs.next());
-            assertColumnMetaData(rs, null, TENANT_TABLE_NAME_NO_TENANT_TYPE_ID, "tenant_id");
-            assertTrue(rs.next());
+            // (tenant_id column is not visible in tenant-specific connection)
             assertColumnMetaData(rs, null, TENANT_TABLE_NAME_NO_TENANT_TYPE_ID, "id");
             assertTrue(rs.next());
             assertColumnMetaData(rs, null, TENANT_TABLE_NAME_NO_TENANT_TYPE_ID, "tenant_col");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bed96f15/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 7a1f2be..54dfae3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -42,7 +42,9 @@ import org.apache.phoenix.expression.function.SQLIndexTypeFunction;
 import org.apache.phoenix.expression.function.SQLTableTypeFunction;
 import org.apache.phoenix.expression.function.SQLViewTypeFunction;
 import org.apache.phoenix.expression.function.SqlTypeNameFunction;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.parse.HintNode.Hint;
@@ -372,7 +374,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
     }
 
     private static void appendConjunction(StringBuilder buf) {
-        buf.append(buf.length() == 0 ? " where " : " and ");
+        buf.append(buf.length() == 0 ? "" : " and ");
     }
     
     @Override
@@ -404,7 +406,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
                 ARRAY_SIZE + "," +
                 COLUMN_FAMILY + "," +
                 DATA_TYPE + " " + TYPE_ID + "," +// raw type id for potential internal consumption
-                VIEW_CONSTANT +
+                VIEW_CONSTANT + "," +
+                MULTI_TENANT + "," +
+                KEY_SEQ +
                 " from " + SYSTEM_CATALOG + " " + SYSTEM_CATALOG_ALIAS);
         StringBuilder where = new StringBuilder();
         addTenantIdFilter(where, catalog);
@@ -443,12 +447,86 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
             appendConjunction(where);
             where.append(COLUMN_NAME + " is not null" );
         }
-        buf.append(where);
+        boolean isTenantSpecificConnection = connection.getTenantId() != null;
+        if (isTenantSpecificConnection) {
+            buf.append(" where (" + where + ") OR ("
+                    + COLUMN_FAMILY + " is null AND " +  COLUMN_NAME + " is null)");
+        } else {
+            buf.append(" where " + where);
+        }
         buf.append(" order by " + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + "," + ORDINAL_POSITION);
-        Statement stmt = connection.createStatement();
+
+        Statement stmt;
+        if (isTenantSpecificConnection) {
+            stmt = connection.createStatement(new PhoenixStatementFactory() {
+                @Override
+                public PhoenixStatement newStatement(PhoenixConnection connection) {
+                    return new PhoenixStatement(connection) {
+                        @Override
+                        protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector)
+                                throws SQLException {
+                            return new PhoenixResultSet(
+                                    new TenantColumnFilteringIterator(iterator, projector),
+                                    projector, this);
+                        }
+                    };
+                }
+            });
+        } else {
+            stmt = connection.createStatement();
+        }
         return stmt.executeQuery(buf.toString());
     }
 
+    /**
+     * Filters the tenant id column out of a column metadata result set (thus, where each row is a column definition).
+     * The tenant id is by definition the first column of the primary key, but the primary key does not necessarily
+     * start at the first column. Assumes columns are sorted on ordinal position.
+     */
+    private static class TenantColumnFilteringIterator extends DelegateResultIterator {
+        private final RowProjector rowProjector;
+        private final int columnFamilyIndex;
+        private final int columnNameIndex;
+        private final int multiTenantIndex;
+        private final int keySeqIndex;
+        private boolean inMultiTenantTable;
+
+        private TenantColumnFilteringIterator(ResultIterator delegate, RowProjector rowProjector) throws SQLException {
+            super(delegate);
+            this.rowProjector = rowProjector;
+            this.columnFamilyIndex = rowProjector.getColumnIndex(COLUMN_FAMILY);
+            this.columnNameIndex = rowProjector.getColumnIndex(COLUMN_NAME);
+            this.multiTenantIndex = rowProjector.getColumnIndex(MULTI_TENANT);
+            this.keySeqIndex = rowProjector.getColumnIndex(KEY_SEQ);
+        }
+
+        @Override
+        public Tuple next() throws SQLException {
+            Tuple tuple = super.next();
+
+            while (tuple != null
+                    && getColumn(tuple, columnFamilyIndex) == null && getColumn(tuple, columnNameIndex) == null) {
+                // new table, check if it is multitenant
+                inMultiTenantTable = getColumn(tuple, multiTenantIndex) == Boolean.TRUE;
+                // skip row representing table
+                tuple = super.next();
+            }
+
+            if (tuple != null && inMultiTenantTable && new Short((short)1).equals(getColumn(tuple, keySeqIndex))) {
+                // skip tenant id primary key column
+                return next();
+            }
+
+            return tuple;
+        }
+
+        private Object getColumn(Tuple tuple, int index) throws SQLException {
+            ColumnProjector projector = this.rowProjector.getColumnProjector(index);
+            PDataType type = projector.getExpression().getDataType();
+            return projector.getValue(tuple, type, new ImmutableBytesPtr());
+        }
+    }
+
     @Override
     public Connection getConnection() throws SQLException {
         return connection;


[3/3] git commit: PHOENIX-1344 fix duplicit values in NTH_VALUE function (Vaclav Loffelmann)

Posted by ja...@apache.org.
PHOENIX-1344 fix duplicit values in NTH_VALUE function (Vaclav Loffelmann)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/90dfe816
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/90dfe816
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/90dfe816

Branch: refs/heads/4.0
Commit: 90dfe8160c493884ebac80e9f14ad54ee443fdb6
Parents: bed96f1
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 19 17:18:21 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Oct 19 17:24:26 2014 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/NthValueFunctionIT.java     | 150 +++++++++++++++++++
 .../FirstLastValueBaseClientAggregator.java     |  37 +++--
 .../FirstLastValueServerAggregator.java         |  46 ++++--
 .../util/FirstLastNthValueDataContainer.java    |  55 ++++---
 4 files changed, 244 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/90dfe816/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
index 1cf2643..ec5bf96 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.util.ArrayList;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -152,4 +153,153 @@ public class NthValueFunctionIT extends BaseHBaseManagedTimeIT {
         assertFalse(rs.next());
     }
 
+    @Test
+    public void nonUniqueValuesInOrderByAsc() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 3) WITHIN GROUP (ORDER BY dates ASC) FROM nthValue GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertInIntArray(new int[]{2, 4, 9}, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void nonUniqueValuesInOrderByAscSkipDuplicit() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 5) WITHIN GROUP (ORDER BY dates ASC) FROM nthValue GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(3, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void nonUniqueValuesInOrderByDesc() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 3) WITHIN GROUP (ORDER BY dates DESC) FROM nthValue GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertInIntArray(new int[]{2, 4, 9}, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void nonUniqueValuesInOrderNextValueDesc() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 0, 7)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 1, 9)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (7, 8, 3, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 2) WITHIN GROUP (ORDER BY dates DESC) FROM nthValue GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertInIntArray(new int[]{3, 5}, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void nonUniqueValuesInOrderNextValueAsc() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 0, 7)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 1, 9)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (7, 8, 3, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 5) WITHIN GROUP (ORDER BY dates ASC) FROM nthValue GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertInIntArray(new int[]{3, 5}, rs.getInt(1));
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void ignoreNullValues() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nth_test_table "
+                + "(id INTEGER NOT NULL, page_id UNSIGNED_LONG,"
+                + " dates BIGINT NOT NULL, \"value\" BIGINT NULL CONSTRAINT pk PRIMARY KEY (id, dates))";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (1, 8, 1, 1)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (2, 8, 2, NULL)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (3, 8, 3, NULL)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (5, 8, 4, 4)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (4, 8, 5, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT NTH_VALUE(\"value\", 2)  WITHIN GROUP (ORDER BY dates DESC) FROM nth_test_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getLong(1), 4);
+        assertFalse(rs.next());
+    }
+
+    private void assertInIntArray(int[] should, int actualValue) {
+        ArrayList<Integer> shouldList = new ArrayList<Integer>();
+        for (int i: should) {
+            shouldList.add(i);
+        }
+        assertTrue(shouldList.contains(actualValue));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/90dfe816/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
index fde79ba..6dfca39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.expression.aggregator;
 
+import java.util.LinkedList;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -42,7 +44,7 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
     protected int offset = -1;
     protected BinaryComparator topOrder = new BinaryComparator(ByteUtil.EMPTY_BYTE_ARRAY);
     protected byte[] topValue = null;
-    protected TreeMap<byte[], byte[]> topValues = new TreeMap<byte[], byte[]>(new ByteArrayComparator());
+    protected TreeMap<byte[], LinkedList<byte[]>> topValues = new TreeMap<byte[], LinkedList<byte[]>>(new ByteArrayComparator());
     protected boolean isAscending;
 
     public FirstLastValueBaseClientAggregator() {
@@ -63,7 +65,7 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
                 return false;
             }
 
-            Set<Map.Entry<byte[], byte[]>> entrySet;
+            Set<Map.Entry<byte[], LinkedList<byte[]>>> entrySet;
             if (isAscending) {
                 entrySet = topValues.entrySet();
             } else {
@@ -71,10 +73,14 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
             }
 
             int counter = offset;
-            for (Map.Entry<byte[], byte[]> entry : entrySet) {
-                if (--counter == 0) {
-                    ptr.set(entry.getValue());
-                    return true;
+            for (Map.Entry<byte[], LinkedList<byte[]>> entry : entrySet) {
+                ListIterator<byte[]> it = entry.getValue().listIterator();
+                while (it.hasNext()) {
+                    if (--counter == 0) {
+                        ptr.set(it.next());
+                        return true;
+                    }
+                    it.next();
                 }
             }
 
@@ -103,13 +109,22 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
 
         payload.setPayload(ptr.copyBytes());
         isAscending = payload.getIsAscending();
-        TreeMap serverAggregatorResult = payload.getData();
+        TreeMap<byte[], LinkedList<byte[]>> serverAggregatorResult = payload.getData();
 
         if (useOffset) {
-            payload.setOffset(offset);
-            topValues.putAll(serverAggregatorResult);
+            //merge topValues
+            for (Entry<byte[], LinkedList<byte[]>> entry : serverAggregatorResult.entrySet()) {
+                byte[] itemKey = entry.getKey();
+                LinkedList<byte[]> itemList = entry.getValue();
+
+                if (topValues.containsKey(itemKey)) {
+                    topValues.get(itemKey).addAll(itemList);
+                } else {
+                    topValues.put(itemKey, itemList);
+                }
+            }
         } else {
-            Entry<byte[], byte[]> valueEntry = serverAggregatorResult.firstEntry();
+            Entry<byte[], LinkedList<byte[]>> valueEntry = serverAggregatorResult.firstEntry();
             byte[] currentOrder = valueEntry.getKey();
 
             boolean isBetter;
@@ -120,7 +135,7 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
             }
             if (topOrder.getValue().length < 1 || isBetter) {
                 topOrder = new BinaryComparator(currentOrder);
-                topValue = valueEntry.getValue();
+                topValue = valueEntry.getValue().getFirst();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/90dfe816/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
index 90b7826..5e51e07 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SizedUtil;
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.TreeMap;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -45,11 +46,12 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
     protected byte[] topValue;
     protected boolean useOffset = false;
     protected int offset = -1;
-    protected TreeMap<byte[], byte[]> topValues = new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
+    protected TreeMap<byte[], LinkedList<byte[]>> topValues = new TreeMap<byte[], LinkedList<byte[]>>(new Bytes.ByteArrayComparator());
     protected boolean isAscending;
     protected boolean hasValueDescSortOrder;
     protected Expression orderByColumn;
     protected Expression dataColumn;
+    protected int topValuesCount = 0;
 
     public FirstLastValueServerAggregator() {
         super(SortOrder.getDefault());
@@ -60,6 +62,7 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
         topOrder = new BinaryComparator(ByteUtil.EMPTY_BYTE_ARRAY);
         topValue = null;
         topValues.clear();
+        topValuesCount = 0;
         offset = -1;
         useOffset = false;
     }
@@ -81,7 +84,7 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
 
         if (useOffset) {
             boolean addFlag = false;
-            if (topValues.size() < offset) {
+            if (topValuesCount < offset) {
                 try {
                     addFlag = true;
                 } catch (Exception e) {
@@ -89,25 +92,27 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
                 }
             } else {
                 if (isAscending) {
-                    byte[] lowestKey = topValues.lastKey();
-                    if (Bytes.compareTo(currentOrder, lowestKey) < 0) {
-                        topValues.remove(lowestKey);
+                    if (removeLastElement(currentOrder, topValues.lastKey(), -1)) {
                         addFlag = true;
+                        topValuesCount--;
                     }
-                } else { //desc
-                    byte[] highestKey = topValues.firstKey();
-                    if (Bytes.compareTo(currentOrder, highestKey) > 0) {
-                        topValues.remove(highestKey);
+                } else {
+                    if (removeLastElement(currentOrder, topValues.firstKey(), 1)) {
                         addFlag = true;
+                        topValuesCount--;
                     }
                 }
             }
             if (addFlag) {
+                topValuesCount++;
+                if (!topValues.containsKey(currentOrder)) {
+                    topValues.put(currentOrder, new LinkedList<byte[]>());
+                }
                 //invert bytes if is SortOrder set
                 if (hasValueDescSortOrder) {
-                    topValues.put(currentOrder, SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()));
+                    topValues.get(currentOrder).push(SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()));
                 } else {
-                    topValues.put(currentOrder, ptr.copyBytes());
+                    topValues.get(currentOrder).push(ptr.copyBytes());
                 }
             }
         } else {
@@ -158,14 +163,17 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
         if (useOffset) {
             payload.setOffset(offset);
 
-            if (topValues.size() == 0) {
+            if (topValuesCount == 0) {
                 return false;
             }
         } else {
             if (topValue == null) {
                 return false;
             }
-            topValues.put(topOrder.getValue(), topValue);
+
+            LinkedList<byte[]> topValueList = new LinkedList<byte[]>();
+            topValueList.push(topValue);
+            topValues.put(topOrder.getValue(), topValueList);
         }
         payload.setData(topValues);
 
@@ -202,4 +210,16 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
             this.isAscending = isAscending;
         }
     }
+
+    private boolean removeLastElement(byte[] currentOrder, byte[] lowestKey, int sortOrderInt) {
+        if (Bytes.compareTo(currentOrder, lowestKey) * sortOrderInt >= 0) {
+            if (topValues.get(lowestKey).size() == 1) {
+                topValues.remove(lowestKey);
+            } else {
+                topValues.get(lowestKey).pollFirst();
+            }
+            return true;
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/90dfe816/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java b/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
index 562f189..b358dce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
@@ -19,6 +19,8 @@ package org.apache.phoenix.util;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
@@ -32,7 +34,7 @@ public class FirstLastNthValueDataContainer {
 
     protected boolean isAscending = false;
     protected int offset;
-    protected TreeMap<byte[], byte[]> data;
+    protected TreeMap<byte[], LinkedList<byte[]>> data;
     protected boolean isOrderValuesFixedLength = false;
     protected boolean isDataValuesFixedLength = false;
 
@@ -40,7 +42,7 @@ public class FirstLastNthValueDataContainer {
         isAscending = ascending;
     }
 
-    public void setData(TreeMap<byte[], byte[]> topValues) {
+    public void setData(TreeMap<byte[], LinkedList<byte[]>> topValues) {
         data = topValues;
     }
 
@@ -65,7 +67,7 @@ public class FirstLastNthValueDataContainer {
         int lengthOfDataValues = Bytes.toInt(payload, 5);
         int sizeOfMap = Bytes.toInt(payload, 9);
 
-        data = new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
+        data = new TreeMap<byte[], LinkedList<byte[]>>(new Bytes.ByteArrayComparator());
 
         int payloadOffset = 13;
 
@@ -93,7 +95,10 @@ public class FirstLastNthValueDataContainer {
                 payloadOffset += l;
             }
 
-            data.put(key, value);
+            if(!data.containsKey(key)) {
+                data.put(key, new LinkedList<byte[]>());
+            }
+            data.get(key).add(value);
         }
 
     }
@@ -127,7 +132,7 @@ public class FirstLastNthValueDataContainer {
 
         bos.write(isAscending ? (byte) 1 : (byte) 0);
 
-        Entry<byte[], byte[]> firstEntry = data.firstEntry();
+        Entry<byte[], LinkedList<byte[]>> firstEntry = data.firstEntry();
         if (isOrderValuesFixedLength) {
             bos.write(Bytes.toBytes(firstEntry.getKey().length));
         } else {
@@ -135,34 +140,44 @@ public class FirstLastNthValueDataContainer {
         }
 
         if (isDataValuesFixedLength) {
-            bos.write(Bytes.toBytes(firstEntry.getValue().length));
+            bos.write(Bytes.toBytes(firstEntry.getValue().getFirst().length));
         } else {
             bos.write(Bytes.toBytes(0));
         }
 
-        bos.write(Bytes.toBytes(data.size()));
-
-        for (Map.Entry<byte[], byte[]> entry : data.entrySet()) {
-
-            if (!isOrderValuesFixedLength) {
-                bos.write(Bytes.toBytes(entry.getKey().length));
-            }
-            bos.write(entry.getKey());
-
-            if (!isDataValuesFixedLength) {
-                bos.write(Bytes.toBytes(entry.getValue().length));
+        int offsetForDataLength = bos.size();
+        bos.write(new byte[4]); //space for number of elements
+        int valuesCount = 0;
+
+        for (Map.Entry<byte[], LinkedList<byte[]>> entry : data.entrySet()) {
+            ListIterator<byte[]> it = entry.getValue().listIterator();
+            while(it.hasNext()) {
+                valuesCount++;
+                byte[] itemValue = it.next();
+
+                if (!isOrderValuesFixedLength) {
+                    bos.write(Bytes.toBytes(entry.getKey().length));
+                }
+                bos.write(entry.getKey());
+
+                if (!isDataValuesFixedLength) {
+                    bos.write(Bytes.toBytes(itemValue.length));
+                }
+                bos.write(itemValue);
             }
-            bos.write(entry.getValue());
         }
 
-        return bos.toByteArray();
+        byte[] outputArray = bos.toByteArray();
+        //write number of elements
+        System.arraycopy(Bytes.toBytes(valuesCount), 0, outputArray, offsetForDataLength, 4);
+        return outputArray;
     }
 
     public boolean getIsAscending() {
         return isAscending;
     }
 
-    public TreeMap getData() {
+    public TreeMap<byte[], LinkedList<byte[]>> getData() {
         return data;
     }
 }