You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/20 02:19:46 UTC
[1/3] git commit: PHOENIX-1349 VIEWs do not inherit indexes from
their parent
Repository: phoenix
Updated Branches:
refs/heads/4.0 8ba26edb4 -> 90dfe8160
PHOENIX-1349 VIEWs do not inherit indexes from their parent
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b90f5187
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b90f5187
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b90f5187
Branch: refs/heads/4.0
Commit: b90f51876ffa957cdcdb3a5d88adde70c2da9f36
Parents: 8ba26ed
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Oct 18 22:19:26 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Oct 19 17:23:43 2014 -0700
----------------------------------------------------------------------
.../apache/phoenix/compile/FromCompiler.java | 4 +-
.../apache/phoenix/compile/JoinCompiler.java | 10 +-
.../coprocessor/MetaDataEndpointImpl.java | 6 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 6 +-
.../apache/phoenix/optimize/QueryOptimizer.java | 2 +-
.../phoenix/query/ConnectionQueryServices.java | 3 +-
.../query/ConnectionQueryServicesImpl.java | 60 ++++++--
.../query/ConnectionlessQueryServicesImpl.java | 6 +-
.../query/DelegateConnectionQueryServices.java | 8 +-
.../apache/phoenix/schema/DelegateTable.java | 5 +
.../apache/phoenix/schema/MetaDataClient.java | 145 ++++++++++++-------
.../java/org/apache/phoenix/schema/PTable.java | 6 +
.../org/apache/phoenix/schema/PTableImpl.java | 85 ++++++-----
.../phoenix/compile/QueryOptimizerTest.java | 4 -
14 files changed, 212 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 6f7b006..0fed42a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -441,8 +441,8 @@ public class FromCompiler {
}
PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,
PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
- null, null, columns, null, Collections.<PTable>emptyList(), false,
- Collections.<PName>emptyList(), null, null, false, false, null, null, null);
+ null, null, columns, null, null, Collections.<PTable>emptyList(),
+ false, Collections.<PName>emptyList(), null, null, false, false, null, null, null);
String alias = subselectNode.getAlias();
TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index b6a43b7..ef053de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -744,8 +744,8 @@ public class JoinCompiler {
PTable t = PTableImpl.makePTable(table.getTenantId(), PNameFactory.newName(PROJECTED_TABLE_SCHEMA), table.getName(), PTableType.JOIN,
table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
- retainPKColumns ? table.getBucketNum() : null, projectedColumns, table.getParentTableName(),
- table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType());
+ retainPKColumns ? table.getBucketNum() : null, projectedColumns, table.getParentSchemaName(),
+ table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType());
return new ProjectedPTableWrapper(t, columnNameMap, sourceExpressions);
}
@@ -794,8 +794,8 @@ public class JoinCompiler {
}
PTable t = PTableImpl.makePTable(table.getTenantId(), PNameFactory.newName(PROJECTED_TABLE_SCHEMA), table.getName(), PTableType.JOIN,
table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
- null, projectedColumns, table.getParentTableName(),
- table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType());
+ null, projectedColumns, table.getParentSchemaName(),
+ table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType());
return new ProjectedPTableWrapper(t, columnNameMap, sourceExpressions);
}
}
@@ -1300,7 +1300,7 @@ public class JoinCompiler {
}
PTable t = PTableImpl.makePTable(left.getTenantId(), left.getSchemaName(),
PNameFactory.newName(SchemaUtil.getTableName(left.getName().getString(), right.getName().getString())), left.getType(), left.getIndexState(), left.getTimeStamp(), left.getSequenceNumber(), left.getPKName(), left.getBucketNum(), merged,
- left.getParentTableName(), left.getIndexes(), left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, left.isMultiTenant(), left.getViewType(), left.getViewIndexId(), left.getIndexType());
+ left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(), left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, left.isMultiTenant(), left.getViewType(), left.getViewIndexId(), left.getIndexType());
ListMultimap<String, String> mergedMap = ArrayListMultimap.<String, String>create();
mergedMap.putAll(this.getColumnNameMap());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index c0d29bf..047a947 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -617,9 +617,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp,
- tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? dataTableName : null,
- indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL,
- multiTenant, viewType, viewIndexId, indexType, stats);
+ tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null,
+ tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement,
+ disableWAL, multiTenant, viewType, viewIndexId, indexType, stats);
}
private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 9a3e399..03c643d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -199,13 +199,13 @@ public abstract class BaseQueryPlan implements QueryPlan {
KeyValueSchema schema = ProjectedColumnExpression.buildSchema(dataColumns);
// Set key value schema of the data columns.
serializeSchemaIntoScan(scan, schema);
- String schemaName = context.getCurrentTable().getTable().getSchemaName().getString();
+ String parentSchema = context.getCurrentTable().getTable().getParentSchemaName().getString();
String parentTable = context.getCurrentTable().getTable().getParentTableName().getString();
final ParseNodeFactory FACTORY = new ParseNodeFactory();
TableRef dataTableRef =
FromCompiler.getResolver(
- FACTORY.namedTable(null, TableName.create(schemaName, parentTable)),
- context.getConnection()).resolveTable(schemaName, parentTable);
+ FACTORY.namedTable(null, TableName.create(parentSchema, parentTable)),
+ context.getConnection()).resolveTable(parentSchema, parentTable);
PTable dataTable = dataTableRef.getTable();
// Set index maintainer of the local index.
serializeIndexMaintainerIntoScan(scan, dataTable);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 6b74822..6a68df3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -209,7 +209,7 @@ public class QueryOptimizer {
private static QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan) throws SQLException {
int nColumns = dataPlan.getProjector().getColumnCount();
String alias = '"' + dataPlan.getTableRef().getTableAlias() + '"'; // double quote in case it's case sensitive
- String schemaName = dataPlan.getTableRef().getTable().getSchemaName().getString();
+ String schemaName = index.getParentSchemaName().getString();
schemaName = schemaName.length() == 0 ? null : '"' + schemaName + '"';
String tableName = '"' + index.getTableName().getString() + '"';
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index c017b77..8af9310 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -106,8 +106,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
public String getUserName();
public void incrementTableTimeStamp(final byte[] tenantId, final byte[] schemaName, final byte[] tableName, long clientTS) throws SQLException;
- public PTableStats getTableStats(String physicalName);
- public void addTableStats(String physicalName, PTableStats tableStats);
+ public PTableStats getTableStats(byte[] physicalName, long clientTimeStamp) throws SQLException;
public void clearCache() throws SQLException;
public int getSequenceSaltBuckets();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 415fbca..d46497d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -32,6 +32,7 @@ import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
@@ -97,6 +98,7 @@ import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -123,6 +125,7 @@ import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.ConfigUtil;
@@ -159,7 +162,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private final ReadOnlyProps props;
private final String userName;
private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
- private final Cache<String, PTableStats> tableStatsCache;
+ private final Cache<ImmutableBytesPtr, PTableStats> tableStatsCache;
// Cache the latest meta data here for future connections
// writes guarded by "latestMetaDataLock"
@@ -1015,7 +1018,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
desc = admin.getTableDescriptor(physicalIndexName);
if (Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(desc.getValue(MetaDataUtil.IS_VIEW_INDEX_TABLE_PROP_BYTES)))) {
- this.tableStatsCache.invalidate(Bytes.toString(physicalIndexName));
+ this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalIndexName));
final ReadOnlyProps props = this.getProps();
final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
if (dropMetadata) {
@@ -1050,7 +1053,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
desc = admin.getTableDescriptor(physicalIndexName);
if (Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(desc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
- this.tableStatsCache.invalidate(Bytes.toString(physicalIndexName));
+ this.tableStatsCache.invalidate(new ImmutableBytesPtr(physicalIndexName));
final ReadOnlyProps props = this.getProps();
final boolean dropMetadata = props.getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
if (dropMetadata) {
@@ -1219,11 +1222,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
invalidateTables(result.getTableNamesToDelete());
if (tableType == PTableType.TABLE) {
- byte[] physicalTableName = SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
+ byte[] physicalName = SchemaUtil.getTableNameAsBytes(schemaBytes, tableBytes);
long timestamp = MetaDataUtil.getClientTimeStamp(tableMetaData);
- ensureViewIndexTableDropped(physicalTableName, timestamp);
- ensureLocalIndexTableDropped(physicalTableName, timestamp);
- tableStatsCache.invalidate(SchemaUtil.getTableName(schemaBytes, tableBytes));
+ ensureViewIndexTableDropped(physicalName, timestamp);
+ ensureLocalIndexTableDropped(physicalName, timestamp);
+ tableStatsCache.invalidate(new ImmutableBytesPtr(physicalName));
}
break;
default:
@@ -1235,7 +1238,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private void invalidateTables(final List<byte[]> tableNamesToDelete) {
if (tableNamesToDelete != null) {
for ( byte[] tableName : tableNamesToDelete ) {
- tableStatsCache.invalidate(Bytes.toString(tableName));
+ tableStatsCache.invalidate(new ImmutableBytesPtr(tableName));
}
}
}
@@ -1913,7 +1916,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
sqlE = new SQLException(e);
} finally {
try {
- if (tenantId.length == 0) tableStatsCache.invalidate(SchemaUtil.getTableName(schemaName, tableName));
+ if (tenantId.length == 0) tableStatsCache.invalidate(new ImmutableBytesPtr(SchemaUtil.getTableNameAsBytes(schemaName, tableName)));
htable.close();
} catch (IOException e) {
if (sqlE == null) {
@@ -2096,15 +2099,42 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private void throwConnectionClosedException() {
throw new IllegalStateException("Connection to the cluster is closed");
}
- @Override
- public PTableStats getTableStats(String physicalName) {
- return tableStatsCache.getIfPresent(physicalName);
- }
@Override
- public void addTableStats(String physicalName, PTableStats tableStats) {
- tableStatsCache.put(physicalName, tableStats);
+ public PTableStats getTableStats(final byte[] physicalName, final long clientTimeStamp) throws SQLException {
+ try {
+ return tableStatsCache.get(new ImmutableBytesPtr(physicalName), new Callable<PTableStats>() {
+ @Override
+ public PTableStats call() throws Exception {
+ /*
+ * The shared view index case is tricky, because we don't have
+ * table metadata for it, only an HBase table. We do have stats,
+ * though, so we'll query them directly here and cache them so
+ * we don't keep querying for them.
+ */
+ HTableInterface statsHTable = ConnectionQueryServicesImpl.this.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
+ try {
+ return StatisticsUtil.readStatistics(statsHTable, physicalName, clientTimeStamp);
+ } catch (IOException e) {
+ logger.warn("Unable to read from stats table", e);
+ // Just cache empty stats. We'll try again after some time anyway.
+ return PTableStats.EMPTY_STATS;
+ } finally {
+ try {
+ statsHTable.close();
+ } catch (IOException e) {
+ // Log, but continue. We have our stats anyway now.
+ logger.warn("Unable to close stats table", e);
+ }
+ }
+ }
+
+ });
+ } catch (ExecutionException e) {
+ throw ServerUtil.parseServerException(e);
+ }
}
+
@Override
public int getSequenceSaltBuckets() {
return nSequenceSaltBuckets;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 9bd30a3..b04d29d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -427,15 +427,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
}
@Override
- public PTableStats getTableStats(String physicalName) {
+ public PTableStats getTableStats(byte[] physicalName, long clientTimeStamp) {
return PTableStats.EMPTY_STATS;
}
@Override
- public void addTableStats(String physicalName, PTableStats tableStats) {
- }
-
- @Override
public void clearCache() throws SQLException {
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 34bca4d..bb4bb33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -235,14 +235,10 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
}
@Override
- public PTableStats getTableStats(String physicalName) {
- return getDelegate().getTableStats(physicalName);
+ public PTableStats getTableStats(byte[] physicalName, long clientTimeStamp) throws SQLException {
+ return getDelegate().getTableStats(physicalName, clientTimeStamp);
}
- @Override
- public void addTableStats(String physicalName, PTableStats tableStats) {
- getDelegate().addTableStats(physicalName, tableStats);
- }
@Override
public void clearCache() throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index f4bf1cc..b7ab899 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -225,4 +225,9 @@ public class DelegateTable implements PTable {
public DelegateTable(PTable delegate) {
this.delegate = delegate;
}
+
+ @Override
+ public PName getParentSchemaName() {
+ return delegate.getParentSchemaName();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 12a11bc..924906e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -88,7 +88,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
@@ -137,7 +136,6 @@ import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.LinkType;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.stats.PTableStats;
-import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
@@ -352,6 +350,7 @@ public class MetaDataClient {
// Otherwise, a tenant would be required to create a VIEW first
// which is not really necessary unless you want to filter or add
// columns
+ addIndexesFromPhysicalTable(result);
connection.addTable(resultTable);
return result;
} else {
@@ -365,6 +364,7 @@ public class MetaDataClient {
if (table != null) {
result.setTable(table);
if (code == MutationCode.TABLE_ALREADY_EXISTS) {
+ addIndexesFromPhysicalTable(result);
return result;
}
if (code == MutationCode.TABLE_NOT_FOUND && tryCount + 1 == maxTryCount) {
@@ -377,6 +377,60 @@ public class MetaDataClient {
return result;
}
+
+ /**
+ * Fault in the physical table to the cache and add any indexes it has to the indexes
+ * of the table for which we just updated.
+ * TODO: combine this round trip with the one that updates the cache for the child table.
+ * @param result the result from updating the cache for the current table.
+ * @return true if the PTable contained by result was modified and false otherwise
+ * @throws SQLException if the physical table cannot be found
+ */
+ private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result) throws SQLException {
+ PTable table = result.getTable();
+ // If not a view or if a view directly over an HBase table, there's nothing to do
+ if (table.getType() != PTableType.VIEW || table.getViewType() == ViewType.MAPPED) {
+ return false;
+ }
+ String physicalName = table.getPhysicalName().getString();
+ String schemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
+ String tableName = SchemaUtil.getTableNameFromFullName(physicalName);
+ MetaDataMutationResult parentResult = updateCache(null, schemaName, tableName, false);
+ PTable physicalTable = parentResult.getTable();
+ if (physicalTable == null) {
+ throw new TableNotFoundException(schemaName, tableName);
+ }
+ if (!result.wasUpdated() && !parentResult.wasUpdated()) {
+ return false;
+ }
+ List<PTable> indexes = physicalTable.getIndexes();
+ if (indexes.isEmpty()) {
+ return false;
+ }
+ // Filter out indexes if column doesn't exist in view
+ List<PTable> allIndexes = Lists.newArrayListWithExpectedSize(indexes.size() + table.getIndexes().size());
+ if (result.wasUpdated()) { // Table from server never contains inherited indexes
+ allIndexes.addAll(table.getIndexes());
+ } else { // Only add original ones, as inherited ones may have changed
+ for (PTable index : indexes) {
+ if (index.getViewIndexId() != null) {
+ allIndexes.add(index);
+ }
+ }
+ }
+ for (PTable index : indexes) {
+ for (PColumn pkColumn : index.getPKColumns()) {
+ try {
+ IndexUtil.getDataColumn(table, pkColumn.getName().getString());
+ allIndexes.add(index);
+ } catch (IllegalArgumentException e) { // Ignore, and continue, as column was not found
+ }
+ }
+ }
+ PTable allIndexesTable = PTableImpl.makePTable(table, table.getTimeStamp(), allIndexes);
+ result.setTable(allIndexesTable);
+ return true;
+ }
private void addColumnMutation(String schemaName, String tableName, PColumn column, PreparedStatement colUpsert, String parentTableName, String pkName, Short keySeq, boolean isSalted) throws SQLException {
@@ -1325,11 +1379,12 @@ public class MetaDataClient {
// Bootstrapping for our SYSTEM.TABLE that creates itself before it exists
if (SchemaUtil.isMetaTable(schemaName,tableName)) {
// TODO: what about stats for system catalog?
- PTable table = PTableImpl.makePTable(tenantId,PNameFactory.newName(schemaName), PNameFactory.newName(tableName), tableType,
+ PName newSchemaName = PNameFactory.newName(schemaName);
+ PTable table = PTableImpl.makePTable(tenantId,newSchemaName, PNameFactory.newName(tableName), tableType,
null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
- PNameFactory.newName(QueryConstants.SYSTEM_TABLE_PK_NAME), null, columns, null, Collections.<PTable>emptyList(),
- isImmutableRows, Collections.<PName>emptyList(),
- defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), null, Boolean.TRUE.equals(disableWAL), false, null, indexId, indexType);
+ PNameFactory.newName(QueryConstants.SYSTEM_TABLE_PK_NAME), null, columns, null, null,
+ Collections.<PTable>emptyList(), isImmutableRows,
+ Collections.<PName>emptyList(), defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), null, Boolean.TRUE.equals(disableWAL), false, null, indexId, indexType);
connection.addTable(table);
} else if (tableType == PTableType.INDEX && indexId == null) {
if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
@@ -1471,11 +1526,12 @@ public class MetaDataClient {
connection.addTable(result.getTable());
throw new ConcurrentTableMutationException(schemaName, tableName);
default:
+ PName newSchemaName = PNameFactory.newName(schemaName);
PTable table = PTableImpl.makePTable(
- tenantId, PNameFactory.newName(schemaName), PNameFactory.newName(tableName), tableType, indexState, result.getMutationTime(),
+ tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, result.getMutationTime(),
PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns,
- dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows, physicalNames,
- defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, viewType, indexId, indexType);
+ dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows,
+ physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, viewType, indexId, indexType);
connection.addTable(table);
return table;
}
@@ -2374,57 +2430,36 @@ public class MetaDataClient {
}
public PTableStats getTableStats(PTable table) throws SQLException {
- boolean isView = table.getType() == PTableType.VIEW;
+ /*
+ * The shared view index case is tricky, because we don't have
+ * table meta data for it, only an HBase table. We do have stats,
+ * though, so we'll query them directly here and cache them so
+ * we don't keep querying for them.
+ */
boolean isSharedIndex = table.getViewIndexId() != null;
- if (!isView && !isSharedIndex) {
- return table.getTableStats();
+ if (isSharedIndex) {
+ return connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), getClientTimeStamp());
}
+ boolean isView = table.getType() == PTableType.VIEW;
String physicalName = table.getPhysicalName().getString();
- // If we have a VIEW or a local or view INDEX, check our cache rather
- // than updating the cache for that table to prevent an extra roundtrip.
- PTableStats tableStats = connection.getQueryServices().getTableStats(physicalName);
- if (tableStats != null) {
- return tableStats;
- }
- if (isView) {
- String physicalSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
- String physicalTableName = SchemaUtil.getTableNameFromFullName(physicalName);
- MetaDataMutationResult result = updateCache(null, /* use global tenant id to get physical table */
- physicalSchemaName, physicalTableName);
- PTable physicalTable = result.getTable();
- if(physicalTable == null) {
- // We should be able to find the physical table, as we found the logical one
- // Might mean the physical table as just deleted.
- logger.warn("Unable to retrieve physical table " + physicalName + " for table " + table.getName().getString());
- throw new TableNotFoundException(table.getSchemaName().getString(),table.getTableName().getString());
- }
- tableStats = physicalTable.getTableStats();
- } else {
- /*
- * Otherwise, we have a shared view. This case is tricky, because we don't have
- * table metadata for it, only an HBase table. We do have stats, though, so we'll
- * query them directly here and cache them so we don't keep querying for them.
- */
- HTableInterface statsHTable = connection.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
+ if (isView && table.getViewType() != ViewType.MAPPED) {
try {
- long clientTimeStamp = getClientTimeStamp();
- tableStats = StatisticsUtil.readStatistics(statsHTable, table.getPhysicalName().getBytes(), clientTimeStamp);
- } catch (IOException e) {
- logger.warn("Unable to read from stats table", e);
- // Just cache empty stats. We'll try again after some time anyway.
- tableStats = PTableStats.EMPTY_STATS;
- } finally {
- try {
- statsHTable.close();
- } catch (IOException e) {
- // Log, but continue. We have our stats anyway now.
- logger.warn("Unable to close stats table", e);
- }
+ return connection.getMetaDataCache().getTable(new PTableKey(null, physicalName)).getTableStats();
+ } catch (TableNotFoundException e) {
+ // Possible when the table timestamp == current timestamp - 1.
+ // This would be most likely during the initial index build of a view index
+ // where we're doing an upsert select from the tenant specific table.
+ // TODO: would we want to always load the physical table in updateCache in
+ // this case too, as we might not update the view with all of it's indexes?
+ String physicalSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
+ String physicalTableName = SchemaUtil.getTableNameFromFullName(physicalName);
+ MetaDataMutationResult result = updateCache(null, physicalSchemaName, physicalTableName, false);
+ if (result.getTable() == null) {
+ throw new TableNotFoundException(physicalSchemaName, physicalTableName);
+ }
+ return result.getTable().getTableStats();
}
}
- // Cache these stats so that we don't keep making a roundrip just to get the stats (as
- // they don't change very often.
- connection.getQueryServices().addTableStats(physicalName, tableStats);
- return tableStats;
+ return table.getTableStats();
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 4193200..d59e9c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -285,6 +285,12 @@ public interface PTable {
* on or null if not an index.
*/
PName getParentTableName();
+ /**
+ * Gets the schema name of the data table for an index table.
+ * @return the schema name of the data table that this index is
+ * on or null if not an index.
+ */
+ PName getParentSchemaName();
/**
* For a view, return the name of table in Phoenix that physically stores data.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 8f88e51..2448f39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -103,6 +103,7 @@ public class PTableImpl implements PTable {
private List<PTable> indexes;
// Data table name that the index is created on.
private PName parentName;
+ private PName parentSchemaName;
private PName parentTableName;
private List<PName> physicalNames;
private boolean isImmutableRows;
@@ -173,74 +174,78 @@ public class PTableImpl implements PTable {
}
public static PTableImpl makePTable(PTable table, long timeStamp, List<PTable> indexes) throws SQLException {
+ return makePTable(table, timeStamp, indexes, table.getSchemaName());
+ }
+
+ public static PTableImpl makePTable(PTable table, long timeStamp, List<PTable> indexes, PName parentSchemaName) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
- table.getSequenceNumber() + 1, table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentTableName(), indexes,
- table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
- table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+ table.getSequenceNumber() + 1, table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(),
+ indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
+ table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
}
public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(),
- table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(),
- table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
- table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+ table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
+ table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
+ table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
}
public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
- sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(), table.isImmutableRows(),
- table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(),
- table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+ sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
+ table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
+ table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
}
public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
- sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(),
- isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
- table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+ sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
+ table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
+ table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
}
public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
return new PTableImpl(
table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), state, table.getTimeStamp(),
table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table),
- table.getParentTableName(), table.getIndexes(), table.isImmutableRows(),
- table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
- table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
+ table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
+ table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
+ table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats());
}
public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
- PName pkName, Integer bucketNum, List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows,
- List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType,
- Short viewIndexId, IndexType indexType) throws SQLException {
- return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataTableName,
- indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType,
- PTableStats.EMPTY_STATS);
+ PName pkName, Integer bucketNum, List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
+ boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
+ ViewType viewType, Short viewIndexId, IndexType indexType) throws SQLException {
+ return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
+ dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId,
+ indexType, PTableStats.EMPTY_STATS);
}
public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum,
- List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows,
- List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
- boolean multiTenant, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats)
+ List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
+ boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
+ boolean disableWAL, boolean multiTenant, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats)
throws SQLException {
return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
- bucketNum, columns, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
- viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType, stats);
+ bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
+ defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType, stats);
}
private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum, List<PColumn> columns,
- PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames,
- PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType,
- Short viewIndexId, IndexType indexType, PTableStats stats) throws SQLException {
+ PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
+ List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
+ ViewType viewType, Short viewIndexId, IndexType indexType, PTableStats stats) throws SQLException {
init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
- stats, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression,
- disableWAL, multiTenant, viewType, viewIndexId, indexType);
+ stats, schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
+ viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType);
}
@Override
@@ -260,9 +265,9 @@ public class PTableImpl implements PTable {
}
private void init(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
- PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentTableName, List<PTable> indexes,
- boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
- ViewType viewType, Short viewIndexId, IndexType indexType ) throws SQLException {
+ PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentSchemaName, PName parentTableName,
+ List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
+ boolean multiTenant, ViewType viewType, Short viewIndexId, IndexType indexType ) throws SQLException {
Preconditions.checkNotNull(schemaName);
Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -379,9 +384,10 @@ public class PTableImpl implements PTable {
estimatedSize += index.getEstimatedSize();
}
+ this.parentSchemaName = parentSchemaName;
this.parentTableName = parentTableName;
this.parentName = parentTableName == null ? null : PNameFactory.newName(SchemaUtil.getTableName(
- schemaName.getString(), parentTableName.getString()));
+ parentSchemaName.getString(), parentTableName.getString()));
estimatedSize += PNameFactory.getEstimatedSize(this.parentName);
this.physicalNames = physicalNames == null ? ImmutableList.<PName>of() : ImmutableList.copyOf(physicalNames);
@@ -911,8 +917,8 @@ public class PTableImpl implements PTable {
try {
PTableImpl result = new PTableImpl();
result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
- (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, dataTableName,indexes, isImmutableRows,
- physicalNames, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType, viewIndexId, indexType);
+ (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, schemaName,dataTableName, indexes,
+ isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType, viewIndexId, indexType);
return result;
} catch (SQLException e) {
throw new RuntimeException(e); // Impossible
@@ -1005,5 +1011,10 @@ public class PTableImpl implements PTable {
public PTableStats getTableStats() {
return tableStats;
}
+
+ @Override
+ public PName getParentSchemaName() {
+ return parentSchemaName;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b90f5187/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
index 81f4a45..c046bc6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
@@ -39,7 +39,6 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
-import org.junit.Ignore;
import org.junit.Test;
import com.google.common.base.Joiner;
@@ -336,7 +335,6 @@ public class QueryOptimizerTest extends BaseConnectionlessQueryTest {
}
@Test
- @Ignore // FIXME : https://issues.apache.org/jira/browse/PHOENIX-1302
// Multi-tenant = true; Query uses index = true; Salted = false
public void testAssertQueryPlanDetails3() throws Exception {
testAssertQueryPlanDetails(true, true, true);
@@ -361,7 +359,6 @@ public class QueryOptimizerTest extends BaseConnectionlessQueryTest {
}
@Test
- @Ignore // FIXME : https://issues.apache.org/jira/browse/PHOENIX-1302
// Multi-tenant = true; Query uses index = true; Salted = false
public void testAssertQueryPlanDetails7() throws Exception {
testAssertQueryPlanDetails(true, true, false);
@@ -479,7 +476,6 @@ public class QueryOptimizerTest extends BaseConnectionlessQueryTest {
}
@Test
- @Ignore // FIXME : https://issues.apache.org/jira/browse/PHOENIX-1302
public void testAssertQueryAgainstTenantSpecificViewGoesThroughIndex() throws Exception {
Connection conn = DriverManager.getConnection(getUrl(), new Properties());
[2/3] git commit: PHOENIX-1271 hide tenant column in tenant-specific
connections (Bruno Dumon)
Posted by ja...@apache.org.
PHOENIX-1271 hide tenant column in tenant-specific
connections (Bruno Dumon)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bed96f15
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bed96f15
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bed96f15
Branch: refs/heads/4.0
Commit: bed96f15d80d4565dd116b57a2677bb8f3b65a31
Parents: b90f518
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 19 15:47:39 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Oct 19 17:24:11 2014 -0700
----------------------------------------------------------------------
.../end2end/TenantSpecificTablesDDLIT.java | 6 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 86 +++++++++++++++++++-
2 files changed, 84 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bed96f15/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index a9fa412..589e963 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -556,8 +556,7 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
assertTrue(rs.next());
assertColumnMetaData(rs, null, TENANT_TABLE_NAME, "user");
assertTrue(rs.next());
- assertColumnMetaData(rs, null, TENANT_TABLE_NAME, "tenant_id");
- assertTrue(rs.next());
+ // (tenant_id column is not visible in tenant-specific connection)
assertColumnMetaData(rs, null, TENANT_TABLE_NAME, "tenant_type_id");
assertTrue(rs.next());
assertColumnMetaData(rs, null, TENANT_TABLE_NAME, "id");
@@ -566,8 +565,7 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
assertTrue(rs.next());
assertColumnMetaData(rs, null, TENANT_TABLE_NAME_NO_TENANT_TYPE_ID, "user");
assertTrue(rs.next());
- assertColumnMetaData(rs, null, TENANT_TABLE_NAME_NO_TENANT_TYPE_ID, "tenant_id");
- assertTrue(rs.next());
+ // (tenant_id column is not visible in tenant-specific connection)
assertColumnMetaData(rs, null, TENANT_TABLE_NAME_NO_TENANT_TYPE_ID, "id");
assertTrue(rs.next());
assertColumnMetaData(rs, null, TENANT_TABLE_NAME_NO_TENANT_TYPE_ID, "tenant_col");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bed96f15/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 7a1f2be..54dfae3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -42,7 +42,9 @@ import org.apache.phoenix.expression.function.SQLIndexTypeFunction;
import org.apache.phoenix.expression.function.SQLTableTypeFunction;
import org.apache.phoenix.expression.function.SQLViewTypeFunction;
import org.apache.phoenix.expression.function.SqlTypeNameFunction;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.iterate.DelegateResultIterator;
import org.apache.phoenix.iterate.MaterializedResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.parse.HintNode.Hint;
@@ -372,7 +374,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
}
private static void appendConjunction(StringBuilder buf) {
- buf.append(buf.length() == 0 ? " where " : " and ");
+ buf.append(buf.length() == 0 ? "" : " and ");
}
@Override
@@ -404,7 +406,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
ARRAY_SIZE + "," +
COLUMN_FAMILY + "," +
DATA_TYPE + " " + TYPE_ID + "," +// raw type id for potential internal consumption
- VIEW_CONSTANT +
+ VIEW_CONSTANT + "," +
+ MULTI_TENANT + "," +
+ KEY_SEQ +
" from " + SYSTEM_CATALOG + " " + SYSTEM_CATALOG_ALIAS);
StringBuilder where = new StringBuilder();
addTenantIdFilter(where, catalog);
@@ -443,12 +447,86 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
appendConjunction(where);
where.append(COLUMN_NAME + " is not null" );
}
- buf.append(where);
+ boolean isTenantSpecificConnection = connection.getTenantId() != null;
+ if (isTenantSpecificConnection) {
+ buf.append(" where (" + where + ") OR ("
+ + COLUMN_FAMILY + " is null AND " + COLUMN_NAME + " is null)");
+ } else {
+ buf.append(" where " + where);
+ }
buf.append(" order by " + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + "," + ORDINAL_POSITION);
- Statement stmt = connection.createStatement();
+
+ Statement stmt;
+ if (isTenantSpecificConnection) {
+ stmt = connection.createStatement(new PhoenixStatementFactory() {
+ @Override
+ public PhoenixStatement newStatement(PhoenixConnection connection) {
+ return new PhoenixStatement(connection) {
+ @Override
+ protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector)
+ throws SQLException {
+ return new PhoenixResultSet(
+ new TenantColumnFilteringIterator(iterator, projector),
+ projector, this);
+ }
+ };
+ }
+ });
+ } else {
+ stmt = connection.createStatement();
+ }
return stmt.executeQuery(buf.toString());
}
+ /**
+ * Filters the tenant id column out of a column metadata result set (thus, where each row is a column definition).
+ * The tenant id is by definition the first column of the primary key, but the primary key does not necessarily
+ * start at the first column. Assumes columns are sorted on ordinal position.
+ */
+ private static class TenantColumnFilteringIterator extends DelegateResultIterator {
+ private final RowProjector rowProjector;
+ private final int columnFamilyIndex;
+ private final int columnNameIndex;
+ private final int multiTenantIndex;
+ private final int keySeqIndex;
+ private boolean inMultiTenantTable;
+
+ private TenantColumnFilteringIterator(ResultIterator delegate, RowProjector rowProjector) throws SQLException {
+ super(delegate);
+ this.rowProjector = rowProjector;
+ this.columnFamilyIndex = rowProjector.getColumnIndex(COLUMN_FAMILY);
+ this.columnNameIndex = rowProjector.getColumnIndex(COLUMN_NAME);
+ this.multiTenantIndex = rowProjector.getColumnIndex(MULTI_TENANT);
+ this.keySeqIndex = rowProjector.getColumnIndex(KEY_SEQ);
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ Tuple tuple = super.next();
+
+ while (tuple != null
+ && getColumn(tuple, columnFamilyIndex) == null && getColumn(tuple, columnNameIndex) == null) {
+ // new table, check if it is multitenant
+ inMultiTenantTable = getColumn(tuple, multiTenantIndex) == Boolean.TRUE;
+ // skip row representing table
+ tuple = super.next();
+ }
+
+ if (tuple != null && inMultiTenantTable && new Short((short)1).equals(getColumn(tuple, keySeqIndex))) {
+ // skip tenant id primary key column
+ return next();
+ }
+
+ return tuple;
+ }
+
+ private Object getColumn(Tuple tuple, int index) throws SQLException {
+ ColumnProjector projector = this.rowProjector.getColumnProjector(index);
+ PDataType type = projector.getExpression().getDataType();
+ return projector.getValue(tuple, type, new ImmutableBytesPtr());
+ }
+ }
+
@Override
public Connection getConnection() throws SQLException {
return connection;
[3/3] git commit: PHOENIX-1344 fix duplicit values in NTH_VALUE
function (Vaclav Loffelmann)
Posted by ja...@apache.org.
PHOENIX-1344 fix duplicit values in NTH_VALUE function (Vaclav Loffelmann)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/90dfe816
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/90dfe816
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/90dfe816
Branch: refs/heads/4.0
Commit: 90dfe8160c493884ebac80e9f14ad54ee443fdb6
Parents: bed96f1
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 19 17:18:21 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Oct 19 17:24:26 2014 -0700
----------------------------------------------------------------------
.../phoenix/end2end/NthValueFunctionIT.java | 150 +++++++++++++++++++
.../FirstLastValueBaseClientAggregator.java | 37 +++--
.../FirstLastValueServerAggregator.java | 46 ++++--
.../util/FirstLastNthValueDataContainer.java | 55 ++++---
4 files changed, 244 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/90dfe816/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
index 1cf2643..ec5bf96 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
+import java.util.ArrayList;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -152,4 +153,153 @@ public class NthValueFunctionIT extends BaseHBaseManagedTimeIT {
assertFalse(rs.next());
}
+ @Test
+ public void nonUniqueValuesInOrderByAsc() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+
+ String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+ + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+ + " dates INTEGER, val INTEGER)";
+ conn.createStatement().execute(ddl);
+
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 3) WITHIN GROUP (ORDER BY dates ASC) FROM nthValue GROUP BY page_id");
+
+ assertTrue(rs.next());
+ assertInIntArray(new int[]{2, 4, 9}, rs.getInt(1));
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void nonUniqueValuesInOrderByAscSkipDuplicit() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+
+ String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+ + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+ + " dates INTEGER, val INTEGER)";
+ conn.createStatement().execute(ddl);
+
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 5) WITHIN GROUP (ORDER BY dates ASC) FROM nthValue GROUP BY page_id");
+
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void nonUniqueValuesInOrderByDesc() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+
+ String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+ + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+ + " dates INTEGER, val INTEGER)";
+ conn.createStatement().execute(ddl);
+
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 3) WITHIN GROUP (ORDER BY dates DESC) FROM nthValue GROUP BY page_id");
+
+ assertTrue(rs.next());
+ assertInIntArray(new int[]{2, 4, 9}, rs.getInt(1));
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void nonUniqueValuesInOrderNextValueDesc() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+
+ String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+ + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+ + " dates INTEGER, val INTEGER)";
+ conn.createStatement().execute(ddl);
+
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 0, 7)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 1, 9)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (7, 8, 3, 5)");
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 2) WITHIN GROUP (ORDER BY dates DESC) FROM nthValue GROUP BY page_id");
+
+ assertTrue(rs.next());
+ assertInIntArray(new int[]{3, 5}, rs.getInt(1));
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void nonUniqueValuesInOrderNextValueAsc() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+
+ String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+ + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+ + " dates INTEGER, val INTEGER)";
+ conn.createStatement().execute(ddl);
+
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 0, 7)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 1, 9)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 2, 4)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 2, 2)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (6, 8, 3, 3)");
+ conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (7, 8, 3, 5)");
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 5) WITHIN GROUP (ORDER BY dates ASC) FROM nthValue GROUP BY page_id");
+
+ assertTrue(rs.next());
+ assertInIntArray(new int[]{3, 5}, rs.getInt(1));
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void ignoreNullValues() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+
+ String ddl = "CREATE TABLE IF NOT EXISTS nth_test_table "
+ + "(id INTEGER NOT NULL, page_id UNSIGNED_LONG,"
+ + " dates BIGINT NOT NULL, \"value\" BIGINT NULL CONSTRAINT pk PRIMARY KEY (id, dates))";
+ conn.createStatement().execute(ddl);
+
+ conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (1, 8, 1, 1)");
+ conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (2, 8, 2, NULL)");
+ conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (3, 8, 3, NULL)");
+ conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (5, 8, 4, 4)");
+ conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (4, 8, 5, 5)");
+ conn.commit();
+
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT NTH_VALUE(\"value\", 2) WITHIN GROUP (ORDER BY dates DESC) FROM nth_test_table GROUP BY page_id");
+
+ assertTrue(rs.next());
+ assertEquals(rs.getLong(1), 4);
+ assertFalse(rs.next());
+ }
+
+ private void assertInIntArray(int[] should, int actualValue) {
+ ArrayList<Integer> shouldList = new ArrayList<Integer>();
+ for (int i: should) {
+ shouldList.add(i);
+ }
+ assertTrue(shouldList.contains(actualValue));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/90dfe816/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
index fde79ba..6dfca39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.expression.aggregator;
+import java.util.LinkedList;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -42,7 +44,7 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
protected int offset = -1;
protected BinaryComparator topOrder = new BinaryComparator(ByteUtil.EMPTY_BYTE_ARRAY);
protected byte[] topValue = null;
- protected TreeMap<byte[], byte[]> topValues = new TreeMap<byte[], byte[]>(new ByteArrayComparator());
+ protected TreeMap<byte[], LinkedList<byte[]>> topValues = new TreeMap<byte[], LinkedList<byte[]>>(new ByteArrayComparator());
protected boolean isAscending;
public FirstLastValueBaseClientAggregator() {
@@ -63,7 +65,7 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
return false;
}
- Set<Map.Entry<byte[], byte[]>> entrySet;
+ Set<Map.Entry<byte[], LinkedList<byte[]>>> entrySet;
if (isAscending) {
entrySet = topValues.entrySet();
} else {
@@ -71,10 +73,14 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
}
int counter = offset;
- for (Map.Entry<byte[], byte[]> entry : entrySet) {
- if (--counter == 0) {
- ptr.set(entry.getValue());
- return true;
+ for (Map.Entry<byte[], LinkedList<byte[]>> entry : entrySet) {
+ ListIterator<byte[]> it = entry.getValue().listIterator();
+ while (it.hasNext()) {
+ if (--counter == 0) {
+ ptr.set(it.next());
+ return true;
+ }
+ it.next();
}
}
@@ -103,13 +109,22 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
payload.setPayload(ptr.copyBytes());
isAscending = payload.getIsAscending();
- TreeMap serverAggregatorResult = payload.getData();
+ TreeMap<byte[], LinkedList<byte[]>> serverAggregatorResult = payload.getData();
if (useOffset) {
- payload.setOffset(offset);
- topValues.putAll(serverAggregatorResult);
+ //merge topValues
+ for (Entry<byte[], LinkedList<byte[]>> entry : serverAggregatorResult.entrySet()) {
+ byte[] itemKey = entry.getKey();
+ LinkedList<byte[]> itemList = entry.getValue();
+
+ if (topValues.containsKey(itemKey)) {
+ topValues.get(itemKey).addAll(itemList);
+ } else {
+ topValues.put(itemKey, itemList);
+ }
+ }
} else {
- Entry<byte[], byte[]> valueEntry = serverAggregatorResult.firstEntry();
+ Entry<byte[], LinkedList<byte[]>> valueEntry = serverAggregatorResult.firstEntry();
byte[] currentOrder = valueEntry.getKey();
boolean isBetter;
@@ -120,7 +135,7 @@ public class FirstLastValueBaseClientAggregator extends BaseAggregator {
}
if (topOrder.getValue().length < 1 || isBetter) {
topOrder = new BinaryComparator(currentOrder);
- topValue = valueEntry.getValue();
+ topValue = valueEntry.getValue().getFirst();
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/90dfe816/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
index 90b7826..5e51e07 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.SizedUtil;
import java.io.IOException;
+import java.util.LinkedList;
import java.util.List;
import java.util.TreeMap;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -45,11 +46,12 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
protected byte[] topValue;
protected boolean useOffset = false;
protected int offset = -1;
- protected TreeMap<byte[], byte[]> topValues = new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
+ protected TreeMap<byte[], LinkedList<byte[]>> topValues = new TreeMap<byte[], LinkedList<byte[]>>(new Bytes.ByteArrayComparator());
protected boolean isAscending;
protected boolean hasValueDescSortOrder;
protected Expression orderByColumn;
protected Expression dataColumn;
+ protected int topValuesCount = 0;
public FirstLastValueServerAggregator() {
super(SortOrder.getDefault());
@@ -60,6 +62,7 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
topOrder = new BinaryComparator(ByteUtil.EMPTY_BYTE_ARRAY);
topValue = null;
topValues.clear();
+ topValuesCount = 0;
offset = -1;
useOffset = false;
}
@@ -81,7 +84,7 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
if (useOffset) {
boolean addFlag = false;
- if (topValues.size() < offset) {
+ if (topValuesCount < offset) {
try {
addFlag = true;
} catch (Exception e) {
@@ -89,25 +92,27 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
}
} else {
if (isAscending) {
- byte[] lowestKey = topValues.lastKey();
- if (Bytes.compareTo(currentOrder, lowestKey) < 0) {
- topValues.remove(lowestKey);
+ if (removeLastElement(currentOrder, topValues.lastKey(), -1)) {
addFlag = true;
+ topValuesCount--;
}
- } else { //desc
- byte[] highestKey = topValues.firstKey();
- if (Bytes.compareTo(currentOrder, highestKey) > 0) {
- topValues.remove(highestKey);
+ } else {
+ if (removeLastElement(currentOrder, topValues.firstKey(), 1)) {
addFlag = true;
+ topValuesCount--;
}
}
}
if (addFlag) {
+ topValuesCount++;
+ if (!topValues.containsKey(currentOrder)) {
+ topValues.put(currentOrder, new LinkedList<byte[]>());
+ }
//invert bytes if is SortOrder set
if (hasValueDescSortOrder) {
- topValues.put(currentOrder, SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()));
+ topValues.get(currentOrder).push(SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()));
} else {
- topValues.put(currentOrder, ptr.copyBytes());
+ topValues.get(currentOrder).push(ptr.copyBytes());
}
}
} else {
@@ -158,14 +163,17 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
if (useOffset) {
payload.setOffset(offset);
- if (topValues.size() == 0) {
+ if (topValuesCount == 0) {
return false;
}
} else {
if (topValue == null) {
return false;
}
- topValues.put(topOrder.getValue(), topValue);
+
+ LinkedList<byte[]> topValueList = new LinkedList<byte[]>();
+ topValueList.push(topValue);
+ topValues.put(topOrder.getValue(), topValueList);
}
payload.setData(topValues);
@@ -202,4 +210,16 @@ public class FirstLastValueServerAggregator extends BaseAggregator {
this.isAscending = isAscending;
}
}
+
+ private boolean removeLastElement(byte[] currentOrder, byte[] lowestKey, int sortOrderInt) {
+ if (Bytes.compareTo(currentOrder, lowestKey) * sortOrderInt >= 0) {
+ if (topValues.get(lowestKey).size() == 1) {
+ topValues.remove(lowestKey);
+ } else {
+ topValues.get(lowestKey).pollFirst();
+ }
+ return true;
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/90dfe816/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java b/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
index 562f189..b358dce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
@@ -19,6 +19,8 @@ package org.apache.phoenix.util;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.LinkedList;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
@@ -32,7 +34,7 @@ public class FirstLastNthValueDataContainer {
protected boolean isAscending = false;
protected int offset;
- protected TreeMap<byte[], byte[]> data;
+ protected TreeMap<byte[], LinkedList<byte[]>> data;
protected boolean isOrderValuesFixedLength = false;
protected boolean isDataValuesFixedLength = false;
@@ -40,7 +42,7 @@ public class FirstLastNthValueDataContainer {
isAscending = ascending;
}
- public void setData(TreeMap<byte[], byte[]> topValues) {
+ public void setData(TreeMap<byte[], LinkedList<byte[]>> topValues) {
data = topValues;
}
@@ -65,7 +67,7 @@ public class FirstLastNthValueDataContainer {
int lengthOfDataValues = Bytes.toInt(payload, 5);
int sizeOfMap = Bytes.toInt(payload, 9);
- data = new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
+ data = new TreeMap<byte[], LinkedList<byte[]>>(new Bytes.ByteArrayComparator());
int payloadOffset = 13;
@@ -93,7 +95,10 @@ public class FirstLastNthValueDataContainer {
payloadOffset += l;
}
- data.put(key, value);
+ if(!data.containsKey(key)) {
+ data.put(key, new LinkedList<byte[]>());
+ }
+ data.get(key).add(value);
}
}
@@ -127,7 +132,7 @@ public class FirstLastNthValueDataContainer {
bos.write(isAscending ? (byte) 1 : (byte) 0);
- Entry<byte[], byte[]> firstEntry = data.firstEntry();
+ Entry<byte[], LinkedList<byte[]>> firstEntry = data.firstEntry();
if (isOrderValuesFixedLength) {
bos.write(Bytes.toBytes(firstEntry.getKey().length));
} else {
@@ -135,34 +140,44 @@ public class FirstLastNthValueDataContainer {
}
if (isDataValuesFixedLength) {
- bos.write(Bytes.toBytes(firstEntry.getValue().length));
+ bos.write(Bytes.toBytes(firstEntry.getValue().getFirst().length));
} else {
bos.write(Bytes.toBytes(0));
}
- bos.write(Bytes.toBytes(data.size()));
-
- for (Map.Entry<byte[], byte[]> entry : data.entrySet()) {
-
- if (!isOrderValuesFixedLength) {
- bos.write(Bytes.toBytes(entry.getKey().length));
- }
- bos.write(entry.getKey());
-
- if (!isDataValuesFixedLength) {
- bos.write(Bytes.toBytes(entry.getValue().length));
+ int offsetForDataLength = bos.size();
+ bos.write(new byte[4]); //space for number of elements
+ int valuesCount = 0;
+
+ for (Map.Entry<byte[], LinkedList<byte[]>> entry : data.entrySet()) {
+ ListIterator<byte[]> it = entry.getValue().listIterator();
+ while(it.hasNext()) {
+ valuesCount++;
+ byte[] itemValue = it.next();
+
+ if (!isOrderValuesFixedLength) {
+ bos.write(Bytes.toBytes(entry.getKey().length));
+ }
+ bos.write(entry.getKey());
+
+ if (!isDataValuesFixedLength) {
+ bos.write(Bytes.toBytes(itemValue.length));
+ }
+ bos.write(itemValue);
}
- bos.write(entry.getValue());
}
- return bos.toByteArray();
+ byte[] outputArray = bos.toByteArray();
+ //write number of elements
+ System.arraycopy(Bytes.toBytes(valuesCount), 0, outputArray, offsetForDataLength, 4);
+ return outputArray;
}
public boolean getIsAscending() {
return isAscending;
}
- public TreeMap getData() {
+ public TreeMap<byte[], LinkedList<byte[]>> getData() {
return data;
}
}