You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/11/04 22:14:15 UTC
[48/50] [abbrv] phoenix git commit: Fail-fast iterators for
EncodedColumnQualifierCellsList. Use list iterators instead of get(index) for
navigating lists. Use HBase bytes utility for encoded column names. Fix test
failures for immutable tables and index
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ede568e9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 9a7b9e3..93a87ea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -27,6 +27,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES;
@@ -34,6 +35,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYT
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES;
@@ -57,6 +59,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX;
@@ -74,11 +77,11 @@ import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_
import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE_ARRAY;
import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier;
import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
import static org.apache.phoenix.util.SchemaUtil.getVarChars;
import java.io.IOException;
-import java.sql.DriverManager;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
@@ -150,14 +153,12 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
-import org.apache.phoenix.expression.ColumnExpression;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -190,8 +191,10 @@ import org.apache.phoenix.schema.PMetaDataEntity;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.EncodedCQCounter;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.LinkType;
+import org.apache.phoenix.schema.PTable.StorageScheme;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
@@ -209,10 +212,12 @@ import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
@@ -282,6 +287,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
private static final KeyValue AUTO_PARTITION_SEQ_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, AUTO_PARTITION_SEQ_BYTES);
private static final KeyValue APPEND_ONLY_SCHEMA_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, APPEND_ONLY_SCHEMA_BYTES);
+ private static final KeyValue STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES);
private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
EMPTY_KEYVALUE_KV,
@@ -308,7 +314,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
UPDATE_CACHE_FREQUENCY_KV,
IS_NAMESPACE_MAPPED_KV,
AUTO_PARTITION_SEQ_KV,
- APPEND_ONLY_SCHEMA_KV
+ APPEND_ONLY_SCHEMA_KV,
+ STORAGE_SCHEME_KV
);
static {
Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -338,6 +345,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static final int IS_NAMESPACE_MAPPED_INDEX = TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV);
private static final int AUTO_PARTITION_SEQ_INDEX = TABLE_KV_COLUMNS.indexOf(AUTO_PARTITION_SEQ_KV);
private static final int APPEND_ONLY_SCHEMA_INDEX = TABLE_KV_COLUMNS.indexOf(APPEND_ONLY_SCHEMA_KV);
+ private static final int STORAGE_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV);
// KeyValues for Column
private static final KeyValue DECIMAL_DIGITS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
@@ -351,6 +359,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static final KeyValue IS_VIEW_REFERENCED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES);
private static final KeyValue COLUMN_DEF_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES);
private static final KeyValue IS_ROW_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ROW_TIMESTAMP_BYTES);
+ private static final KeyValue ENCODED_COLUMN_QUALIFIER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ENCODED_COLUMN_QUALIFIER_BYTES);
private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList(
DECIMAL_DIGITS_KV,
COLUMN_SIZE_KV,
@@ -363,11 +372,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
VIEW_CONSTANT_KV,
IS_VIEW_REFERENCED_KV,
COLUMN_DEF_KV,
- IS_ROW_TIMESTAMP_KV
+ IS_ROW_TIMESTAMP_KV,
+ ENCODED_COLUMN_QUALIFIER_KV
);
static {
Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
}
+ private static final KeyValue QUALIFIER_COUNTER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_COUNTER_BYTES);
private static final int DECIMAL_DIGITS_INDEX = COLUMN_KV_COLUMNS.indexOf(DECIMAL_DIGITS_KV);
private static final int COLUMN_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_SIZE_KV);
private static final int NULLABLE_INDEX = COLUMN_KV_COLUMNS.indexOf(NULLABLE_KV);
@@ -379,9 +390,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static final int IS_VIEW_REFERENCED_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV);
private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
private static final int IS_ROW_TIMESTAMP_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV);
+ private static final int ENCODED_COLUMN_QUALIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(ENCODED_COLUMN_QUALIFIER_KV);
private static final int LINK_TYPE_INDEX = 0;
-
+
private static final KeyValue CLASS_NAME_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, CLASS_NAME_BYTES);
private static final KeyValue JAR_PATH_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, JAR_PATH_BYTES);
private static final KeyValue RETURN_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, RETURN_TYPE_BYTES);
@@ -717,8 +729,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(),
isRowTimestampKV.getValueLength()));
-
- PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false);
+ Cell columnQualifierKV = colKeyValues[ENCODED_COLUMN_QUALIFIER_INDEX];
+ Integer columnQualifier =
+ columnQualifierKV == null ? null : getEncodedColumnQualifier(
+ columnQualifierKV.getValueArray(), columnQualifierKV.getValueOffset(),
+ columnQualifierKV.getValueLength());
+ PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifier);
columns.add(column);
}
@@ -926,37 +942,49 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false
: Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(),
isAppendOnlySchemaKv.getValueOffset(), isAppendOnlySchemaKv.getValueLength()));
-
-
+ Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX];
+ //TODO: change this once we start having other values for storage schemes
+ StorageScheme storageScheme = storageSchemeKv == null ? StorageScheme.NON_ENCODED_COLUMN_NAMES : StorageScheme
+ .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(),
+ storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength()));
+
List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
List<PTable> indexes = Lists.newArrayList();
List<PName> physicalTables = Lists.newArrayList();
PName parentTableName = tableType == INDEX ? dataTableName : null;
PName parentSchemaName = tableType == INDEX ? schemaName : null;
+ EncodedCQCounter cqCounter = (storageScheme == StorageScheme.NON_ENCODED_COLUMN_NAMES || tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER : new EncodedCQCounter();
while (true) {
- results.clear();
- scanner.next(results);
- if (results.isEmpty()) {
- break;
- }
- Cell colKv = results.get(LINK_TYPE_INDEX);
- int colKeyLength = colKv.getRowLength();
- PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset);
- int colKeyOffset = offset + colName.getBytes().length + 1;
- PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
- if (colName.getString().isEmpty() && famName != null) {
- LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
- if (linkType == LinkType.INDEX_TABLE) {
- addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
- } else if (linkType == LinkType.PHYSICAL_TABLE) {
- physicalTables.add(famName);
- } else if (linkType == LinkType.PARENT_TABLE) {
- parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes()));
- parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes()));
- }
- } else {
- addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null);
- }
+ results.clear();
+ scanner.next(results);
+ if (results.isEmpty()) {
+ break;
+ }
+ Cell colKv = results.get(LINK_TYPE_INDEX);
+ if (colKv != null) {
+ int colKeyLength = colKv.getRowLength();
+ PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset);
+ int colKeyOffset = offset + colName.getBytes().length + 1;
+ PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
+ if (isQualifierCounterKV(colKv)) {
+ Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC);
+ cqCounter.setValue(famName.getString(), value);
+ } else {
+ if (colName.getString().isEmpty() && famName != null) {
+ LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
+ if (linkType == LinkType.INDEX_TABLE) {
+ addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
+ } else if (linkType == LinkType.PHYSICAL_TABLE) {
+ physicalTables.add(famName);
+ } else if (linkType == LinkType.PARENT_TABLE) {
+ parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes()));
+ parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes()));
+ }
+ } else {
+ addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null);
+ }
+ }
+ }
}
// Avoid querying the stats table because we're holding the rowLock here. Issuing an RPC to a remote
// server while holding this lock is a bad idea and likely to cause contention.
@@ -964,9 +992,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
pkName, saltBucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalTables, defaultFamilyName,
viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType,
rowKeyOrderOptimizable, transactional, updateCacheFrequency, baseColumnCount,
- indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema);
+ indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, cqCounter);
}
-
+
+ private boolean isQualifierCounterKV(Cell kv) {
+ int cmp =
+ Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
+ kv.getQualifierLength(), QUALIFIER_COUNTER_KV.getQualifierArray(),
+ QUALIFIER_COUNTER_KV.getQualifierOffset(), QUALIFIER_COUNTER_KV.getQualifierLength());
+ return cmp == 0;
+ }
+
private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) throws IOException, SQLException {
List<Cell> results = Lists.newArrayList();
scanner.next(results);
@@ -1486,46 +1522,46 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
Short indexId = null;
if (request.hasAllocateIndexId() && request.getAllocateIndexId()) {
String tenantIdStr = tenantIdBytes.length == 0 ? null : Bytes.toString(tenantIdBytes);
- try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)){
- PName physicalName = parentTable.getPhysicalName();
- int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
- SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName,
+ try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)) {
+ PName physicalName = parentTable.getPhysicalName();
+ int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
+ SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName,
nSequenceSaltBuckets, parentTable.isNamespaceMapped() );
// TODO Review Earlier sequence was created at (SCN-1/LATEST_TIMESTAMP) and incremented at the client max(SCN,dataTable.getTimestamp), but it seems we should
// use always LATEST_TIMESTAMP to avoid seeing wrong sequence values by different connection having SCN
// or not.
- long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
- try {
- connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
+ long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
+ try {
+ connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
Short.MIN_VALUE, 1, 1, Long.MIN_VALUE, Long.MAX_VALUE, false, sequenceTimestamp);
- } catch (SequenceAlreadyExistsException e) {
- }
- long[] seqValues = new long[1];
- SQLException[] sqlExceptions = new SQLException[1];
- connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)),
+ } catch (SequenceAlreadyExistsException e) {
+ }
+ long[] seqValues = new long[1];
+ SQLException[] sqlExceptions = new SQLException[1];
+ connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)),
HConstants.LATEST_TIMESTAMP, seqValues, sqlExceptions);
- if (sqlExceptions[0] != null) {
- throw sqlExceptions[0];
- }
- long seqValue = seqValues[0];
- if (seqValue > Short.MAX_VALUE) {
- builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
- builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
- done.run(builder.build());
- return;
- }
- Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
- NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
- List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
- Cell cell = cells.get(0);
- PDataType dataType = MetaDataUtil.getViewIndexIdDataType();
- Object val = dataType.toObject(seqValue, PLong.INSTANCE);
- byte[] bytes = new byte [dataType.getByteSize() + 1];
- dataType.toBytes(val, bytes, 0);
- Cell indexIdCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_INDEX_ID_BYTES,
+ if (sqlExceptions[0] != null) {
+ throw sqlExceptions[0];
+ }
+ long seqValue = seqValues[0];
+ if (seqValue > Short.MAX_VALUE) {
+ builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
+ builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+ done.run(builder.build());
+ return;
+ }
+ Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
+ NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
+ List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
+ Cell cell = cells.get(0);
+ PDataType dataType = MetaDataUtil.getViewIndexIdDataType();
+ Object val = dataType.toObject(seqValue, PLong.INSTANCE);
+ byte[] bytes = new byte [dataType.getByteSize() + 1];
+ dataType.toBytes(val, bytes, 0);
+ Cell indexIdCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_INDEX_ID_BYTES,
cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), bytes);
- cells.add(indexIdCell);
- indexId = (short) seqValue;
+ cells.add(indexIdCell);
+ indexId = (short) seqValue;
}
}
@@ -1536,7 +1572,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// indexing on the system table. This is an issue because of the way we manage batch mutation
// in the Indexer.
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
-
+
// Invalidate the cache - the next getTable call will add it
// TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
@@ -1988,7 +2024,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return result;
}
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
- // Invalidate from cache
+ // Invalidate from cache.
for (ImmutableBytesPtr invalidateKey : invalidateList) {
metaDataCache.invalidate(invalidateKey);
}
@@ -2161,6 +2197,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[][] rkmd = new byte[5][];
int pkCount = getVarChars(m.getRow(), rkmd);
if (pkCount > COLUMN_NAME_INDEX
+ && rkmd[COLUMN_NAME_INDEX] != null && rkmd[COLUMN_NAME_INDEX].length > 0
&& Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
&& Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
columnPutsForBaseTable.add(new PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES)));
@@ -2195,8 +2232,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes.toString(rkmd[FAMILY_NAME_INDEX]);
try {
- existingViewColumn = columnFamily == null ? view.getColumn(columnName) : view.getColumnFamily(
- columnFamily).getColumn(columnName);
+ existingViewColumn = columnFamily == null ? view.getPColumnForColumnName(columnName) : view.getColumnFamily(
+ columnFamily).getPColumnForColumnName(columnName);
} catch (ColumnFamilyNotFoundException e) {
// ignore since it means that the column family is not present for the column to be added.
} catch (ColumnNotFoundException e) {
@@ -2323,26 +2360,26 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
columnsAddedToBaseTable++;
}
}
- /*
- * Allow adding a pk columns to base table : 1. if all the view pk columns are exactly the same as the base
- * table pk columns 2. if we are adding all the existing view pk columns to the base table
- */
- if (addingExistingPkCol && !viewPkCols.equals(basePhysicalTable.getPKColumns())) {
- return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
- }
- addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view,
- deltaNumPkColsSoFar);
-
- /*
- * Increment the sequence number by 1 if:
- * 1) For a diverged view, there were columns (pk columns) added to the view.
- * 2) For a non-diverged view if the base column count changed.
- */
- boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0)
- || (!isDivergedView(view) && columnsAddedToBaseTable > 0);
- updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews,
- invalidateList, clientTimeStamp, columnsAddedToView, columnsAddedToBaseTable,
- viewKey, view, ordinalPositionList, numCols, changeSequenceNumber);
+ /*
+ * Allow adding a pk columns to base table : 1. if all the view pk columns are exactly the same as the base
+ * table pk columns 2. if we are adding all the existing view pk columns to the base table
+ */
+ if (addingExistingPkCol && !viewPkCols.equals(basePhysicalTable.getPKColumns())) {
+ return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
+ }
+ addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view,
+ deltaNumPkColsSoFar);
+
+ /*
+ * Increment the sequence number by 1 if:
+ * 1) For a diverged view, there were columns (pk columns) added to the view.
+ * 2) For a non-diverged view if the base column count changed.
+ */
+ boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0)
+ || (!isDivergedView(view) && columnsAddedToBaseTable > 0);
+ updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews,
+ invalidateList, clientTimeStamp, columnsAddedToView, columnsAddedToBaseTable,
+ viewKey, view, ordinalPositionList, numCols, changeSequenceNumber);
}
return null;
}
@@ -2500,8 +2537,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily);
try {
existingViewColumn =
- columnFamily == null ? view.getColumn(columnName) : view
- .getColumnFamily(columnFamily).getColumn(columnName);
+ columnFamily == null ? view.getPColumnForColumnName(columnName) : view
+ .getColumnFamily(columnFamily).getPColumnForColumnName(columnName);
} catch (ColumnFamilyNotFoundException e) {
// ignore since it means that the column family is not present for the column to
// be added.
@@ -2567,7 +2604,25 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private MetaDataMutationResult validateColumnForAddToBaseTable(PColumn existingViewColumn, Put columnToBeAdded, PTable basePhysicalTable, boolean isColumnToBeAddPkCol, PTable view) {
if (existingViewColumn != null) {
-
+ if (EncodedColumnsUtil.usesEncodedColumnNames(basePhysicalTable) && !SchemaUtil.isPKColumn(existingViewColumn)) {
+ /*
+ * If the column already exists in a view, then we cannot add the column to the base
+ * table. The reason is subtle and is as follows: consider the case where a table
+ * has two views where both the views have the same key value column KV. Now, we
+ * dole out encoded column qualifiers for key value columns in views by using the
+ * counters stored in the base physical table. So the KV column can have different
+ * column qualifiers for the two views. For example, 11 for VIEW1 and 12 for VIEW2.
+ * This naturally extends to rows being inserted using the two views having
+ * different column qualifiers for the column named KV. Now, when an attempt is made
+ * to add column KV to the base table, we cannot decide which column qualifier
+ * should that column be assigned. It cannot be a number different than 11 or 12
+ * since a query like SELECT KV FROM BASETABLE would return null for KV which is
+ * incorrect since column KV is present in rows inserted from the two views. We
+ * cannot use 11 or 12 either because we will then incorrectly return value of KV
+ * column inserted using only one view.
+ */
+ return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
+ }
// Validate data type is same
int baseColumnDataType = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
if (baseColumnDataType != existingViewColumn.getDataType().getSqlType()) {
@@ -2797,6 +2852,16 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return mutationResult;
}
}
+ } else if (type == PTableType.VIEW
+ && EncodedColumnsUtil.usesEncodedColumnNames(table)) {
+ /*
+ * When adding a column to a view that uses encoded column name scheme, we
+ * need to modify the CQ counters stored in the view's physical table. So to
+ * make sure clients get the latest PTable, we need to invalidate the cache
+ * entry.
+ */
+ invalidateList.add(new ImmutableBytesPtr(MetaDataUtil
+ .getPhysicalTableRowForView(table)));
}
for (Mutation m : tableMetaData) {
byte[] key = m.getRow();
@@ -2810,7 +2875,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
&& rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
PColumnFamily family =
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
- family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+ family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
} else if (pkCount > COLUMN_NAME_INDEX
&& rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
addingPKColumn = true;
@@ -3063,7 +3128,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
PColumnFamily family =
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
columnToDelete =
- family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+ family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
} else if (pkCount > COLUMN_NAME_INDEX
&& rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
deletePKColumn = true;
@@ -3152,10 +3217,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] indexKey =
SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index
.getTableName().getBytes());
+ Pair<String, String> columnToDeleteInfo = new Pair<>(columnToDelete.getFamilyName().getString(), columnToDelete.getName().getString());
+ boolean isColumnIndexed = indexMaintainer.getIndexedColumnInfo().contains(columnToDeleteInfo);
+ boolean isCoveredColumn = indexMaintainer.getCoveredColumnInfo().contains(columnToDeleteInfo);
// If index requires this column for its pk, then drop it
- if (indexMaintainer.getIndexedColumns().contains(
- new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete
- .getName().getBytes()))) {
+ if (isColumnIndexed) {
// Since we're dropping the index, lock it to ensure
// that a change in index state doesn't
// occur while we're dropping it.
@@ -3176,9 +3242,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
invalidateList.add(new ImmutableBytesPtr(indexKey));
}
// If the dropped column is a covered index column, invalidate the index
- else if (indexMaintainer.getCoveredColumns().contains(
- new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete
- .getName().getBytes()))) {
+ else if (isCoveredColumn){
invalidateList.add(new ImmutableBytesPtr(indexKey));
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ede568e9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 3cfe790..8a833ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -107,7 +107,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
}
}
- public static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) {
+ private static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) {
byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
if (topN == null) {
return null;
@@ -125,7 +125,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
orderByExpression.readFields(input);
orderByExpressions.add(orderByExpression);
}
- ResultIterator inner = new RegionScannerResultIterator(s);
+ ResultIterator inner = new RegionScannerResultIterator(s, ScanUtil.getMinMaxQualifiersFromScan(scan));
return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null,
estimatedRowSize);
} catch (IOException e) {
@@ -218,21 +218,24 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ //TODO: samarth make this a client side check by looking at order by and group by expressions. Then use that to set min max qualifiers. We can then make useQualifierListAsIndex
+ // a member variable of BaseScannerRegionObserver.
+ boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan)) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
innerScanner =
getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan,
dataColumns, tupleProjector, dataRegion, indexMaintainer, tx,
- viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr);
+ viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr, useQualifierAsIndex);
final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
if (j != null) {
- innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment());
+ innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment(), useQualifierAsIndex);
}
if (scanOffset != null) {
innerScanner = getOffsetScanner(c, innerScanner,
- new OffsetResultIterator(new RegionScannerResultIterator(innerScanner), scanOffset),
+ new OffsetResultIterator(new RegionScannerResultIterator(innerScanner, ScanUtil.getMinMaxQualifiersFromScan(scan)), scanOffset),
scan.getAttribute(QueryConstants.LAST_SCAN) != null);
}
- final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner);
+ final OrderedResultIterator iterator = deserializeFromScan(scan, innerScanner);
if (iterator == null) {
return innerScanner;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ede568e9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 0d0f0c2..a313dd1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -24,6 +24,7 @@ import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT;
import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT;
+import static org.apache.phoenix.util.ScanUtil.getMinMaxQualifiersFromScan;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -96,7 +97,10 @@ import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
import org.apache.phoenix.schema.stats.StatisticsCollector;
import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDataType;
@@ -305,6 +309,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
byte[] deleteCQ = null;
byte[] deleteCF = null;
byte[] emptyCF = null;
+ byte[] emptyKVQualifier = null;
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
if (upsertSelectTable != null) {
isUpsert = true;
@@ -320,12 +325,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
}
emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
+ emptyKVQualifier = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER);
}
TupleProjector tupleProjector = null;
byte[][] viewConstants = null;
ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan)) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j == null && p != null)) {
if (dataColumns != null) {
tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
@@ -334,11 +341,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
theScanner =
getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector,
- c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
+ c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
}
if (j != null) {
- theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env);
+ theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex);
}
int batchSize = 0;
@@ -374,7 +381,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
Aggregator[] rowAggregators = aggregators.getAggregators();
boolean hasMore;
boolean hasAny = false;
- MultiKeyValueTuple result = new MultiKeyValueTuple();
+ Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan);
+ Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan)));
}
@@ -392,7 +400,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
acquiredLock = true;
synchronized (innerScanner) {
do {
- List<Cell> results = new ArrayList<Cell>();
+ List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>();
// Results are potentially returned even when the return value of s.next is false
// since this is an indication of whether or not there are more values after the
// ones returned