You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/08/08 01:42:09 UTC
[10/11] phoenix git commit: Encode column names and take advantage of
encoding in group by and order by
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 13963d7..5e64209 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -56,6 +56,7 @@ import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.TypeMismatchException;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -170,8 +171,8 @@ public class WhereCompiler {
TableRef tableRef = ref.getTableRef();
if (tableRef.equals(context.getCurrentTable()) && !SchemaUtil.isPKColumn(ref.getColumn())) {
// track the where condition columns. Later we need to ensure the Scan in HRS scans these column CFs
- context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), ref.getColumn().getName()
- .getBytes());
+ byte[] cq = EncodedColumnsUtil.getColumnQualifier(ref.getColumn(), tableRef.getTable());
+ context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), cq);
}
return ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive());
}
@@ -194,7 +195,7 @@ public class WhereCompiler {
// just use that.
try {
if (!SchemaUtil.isPKColumn(ref.getColumn())) {
- table.getColumn(ref.getColumn().getName().getString());
+ table.getPColumnForColumnName(ref.getColumn().getName().getString());
}
} catch (AmbiguousColumnException e) {
disambiguateWithFamily = true;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index be4766f..5922b5d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
@@ -51,18 +50,19 @@ import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
+import org.apache.tephra.Transaction;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
import com.google.common.collect.ImmutableList;
-import org.apache.tephra.Transaction;
-
abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@@ -79,6 +79,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String DELETE_CQ = "_DeleteCQ";
public static final String DELETE_CF = "_DeleteCF";
public static final String EMPTY_CF = "_EmptyCF";
+ public static final String EMPTY_COLUMN_QUALIFIER = "_EmptyColumnQualifier";
public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex";
public static final String GROUP_BY_LIMIT = "_GroupByLimit";
public static final String LOCAL_INDEX = "_LocalIndex";
@@ -102,6 +103,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public final static String SCAN_OFFSET = "_RowOffset";
public static final String SCAN_START_ROW_SUFFIX = "_ScanStartRowSuffix";
public static final String SCAN_STOP_ROW_SUFFIX = "_ScanStopRowSuffix";
+ public final static String MIN_QUALIFIER = "_MinQualifier";
+ public final static String MAX_QUALIFIER = "_MaxQualifier";
/**
* Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
@@ -261,14 +264,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
* @param indexMaintainer
* @param viewConstants
*/
- protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+ RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
final RegionScanner s, final int offset, final Scan scan,
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
final HRegion dataRegion, final IndexMaintainer indexMaintainer,
final byte[][] viewConstants, final TupleProjector projector,
- final ImmutableBytesWritable ptr) {
+ final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) {
return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector,
- dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr);
+ dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
}
/**
@@ -286,7 +289,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
* @param tx current transaction
* @param viewConstants
*/
- protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+ RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs,
final Expression[] arrayFuncRefs, final int offset, final Scan scan,
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
@@ -294,7 +297,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
Transaction tx,
final byte[][] viewConstants, final KeyValueSchema kvSchema,
final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
- final ImmutableBytesWritable ptr) {
+ final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) {
return new RegionScanner() {
private boolean hasReferences = checkForReferenceFiles();
@@ -391,11 +394,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
}
if (projector != null) {
- Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result)));
+ // TODO: samarth think if this is the right thing to do here.
+ Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result));
+ Tuple tuple = projector.projectResults(toProject);
result.clear();
result.add(tuple.getValue(0));
- if(arrayElementCell != null)
+ if (arrayElementCell != null) {
result.add(arrayElementCell);
+ }
}
// There is a scanattribute set to retrieve the specific array element
return next;
@@ -429,7 +435,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
}
if (projector != null) {
- Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result)));
+ Tuple toProject = useQualifierAsListIndex ? new PositionBasedMultiKeyValueTuple(result) : new ResultTuple(Result.create(result));
+ Tuple tuple = projector.projectResults(toProject);
result.clear();
result.add(tuple.getValue(0));
if(arrayElementCell != null)
@@ -482,24 +489,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
// Using KeyValueSchema to set and retrieve the value
// collect the first kv to get the row
Cell rowKv = result.get(0);
- for (KeyValueColumnExpression kvExp : arrayKVRefs) {
- if (kvExp.evaluate(tuple, ptr)) {
- for (int idx = tuple.size() - 1; idx >= 0; idx--) {
- Cell kv = tuple.getValue(idx);
- if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length,
- kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength())
- && Bytes.equals(kvExp.getColumnName(), 0, kvExp.getColumnName().length,
- kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) {
- // remove the kv that has the full array values.
- result.remove(idx);
- break;
- }
- }
- }
- }
byte[] value = kvSchema.toBytes(tuple, arrayFuncRefs,
kvSchemaBitSet, ptr);
// Add a dummy kv with the exact value of the array index
+ // TODO: samarth how does this dummy column qualifier play with encoded column names
result.add(new KeyValue(rowKv.getRowArray(), rowKv.getRowOffset(), rowKv.getRowLength(),
QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0, QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length,
QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index f88a931..089c4fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -56,22 +56,27 @@ public class DelegateRegionScanner implements RegionScanner {
delegate.close();
}
+ @Override
public long getMaxResultSize() {
return delegate.getMaxResultSize();
}
+ @Override
public boolean next(List<Cell> arg0, int arg1) throws IOException {
return delegate.next(arg0, arg1);
}
+ @Override
public boolean next(List<Cell> arg0) throws IOException {
return delegate.next(arg0);
}
+ @Override
public boolean nextRaw(List<Cell> arg0, int arg1) throws IOException {
return delegate.nextRaw(arg0, arg1);
}
+ @Override
public boolean nextRaw(List<Cell> arg0) throws IOException {
return delegate.nextRaw(arg0);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index cfe0e4a..9bb47fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -24,6 +24,7 @@ import static org.apache.phoenix.query.QueryServices.GROUPBY_ESTIMATED_DISTINCT_
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE;
+import static org.apache.phoenix.util.ScanUtil.getMinMaxQualifiersFromScan;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
@@ -63,7 +65,10 @@ import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.IndexUtil;
@@ -132,6 +137,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != null);
if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) {
if (dataColumns != null) {
tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
@@ -140,7 +146,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
innerScanner =
getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector,
- c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
+ c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
}
if (j != null) {
@@ -156,9 +162,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
}
if (keyOrdered) { // Optimize by taking advantage that the rows are
// already in the required group by key order
- return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit);
+ return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit, j != null);
} else { // Otherwse, collect them all up in an in memory map
- return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit);
+ return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit, j != null);
}
}
@@ -364,7 +370,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
*/
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner scanner, final List<Expression> expressions,
- final ServerAggregators aggregators, long limit) throws IOException {
+ final ServerAggregators aggregators, long limit, boolean isJoin) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan
+ ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
@@ -378,7 +384,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
estDistVals = Math.max(MIN_DISTINCT_VALUES,
(int) (Bytes.toInt(estDistValsBytes) * 1.5f));
}
-
+ Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan);
+ boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), isJoin);
final boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
@@ -389,12 +396,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
boolean success = false;
try {
boolean hasMore;
-
- MultiKeyValueTuple result = new MultiKeyValueTuple();
+ Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan)));
}
-
HRegion region = c.getEnvironment().getRegion();
boolean acquiredLock = false;
try {
@@ -402,7 +407,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
acquiredLock = true;
synchronized (scanner) {
do {
- List<Cell> results = new ArrayList<Cell>();
+ List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there are
@@ -437,7 +442,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
}
}
}
-
+
/**
* Used for an aggregate query in which the key order match the group by key order. In this
* case, we can do the aggregation as we scan, by detecting when the group by key changes.
@@ -446,12 +451,14 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
*/
private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
- final ServerAggregators aggregators, final long limit) throws IOException {
+ final ServerAggregators aggregators, final long limit, final boolean isJoin) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by "
+ expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
}
+ final Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan);
+ final boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(minMaxQualifiers, isJoin);
return new BaseRegionScanner(scanner) {
private long rowCount = 0;
private ImmutableBytesWritable currentKey = null;
@@ -461,7 +468,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
boolean hasMore;
boolean atLimit;
boolean aggBoundary = false;
- MultiKeyValueTuple result = new MultiKeyValueTuple();
+ Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
ImmutableBytesWritable key = null;
Aggregator[] rowAggregators = aggregators.getAggregators();
// If we're calculating no aggregate functions, we can exit at the
@@ -474,7 +481,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
acquiredLock = true;
synchronized (scanner) {
do {
- List<Cell> kvs = new ArrayList<Cell>();
+ List<Cell> kvs = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there
@@ -512,6 +519,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
KeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(),
currentKey.getLength(), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
AGG_TIMESTAMP, value, 0, value.length);
+ //TODO: samarth aaha how do we handle this. It looks like we are adding stuff like this to the results
+ // that we are returning. Bounded skip null cell list won't handle this properly. Interesting. So how do we
+ // handle this. Does having a reserved set of column qualifiers help here?
results.add(keyValue);
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Adding new aggregate row: "
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 2650225..8c2c3d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -110,7 +110,7 @@ public class HashJoinRegionScanner implements RegionScanner {
private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException {
if (result.isEmpty())
return;
-
+ //TODO: samarth make joins work with position based lookup.
Tuple tuple = new ResultTuple(Result.create(result));
// For backward compatibility. In new versions, HashJoinInfo.forceProjection()
// always returns true.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 5d20fbb..4f41bf6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -27,6 +27,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES;
@@ -34,6 +35,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYT
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES;
@@ -57,6 +59,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX;
@@ -188,8 +191,10 @@ import org.apache.phoenix.schema.PMetaDataEntity;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.EncodedCQCounter;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.LinkType;
+import org.apache.phoenix.schema.PTable.StorageScheme;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
@@ -204,10 +209,12 @@ import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
@@ -278,6 +285,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
private static final KeyValue AUTO_PARTITION_SEQ_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, AUTO_PARTITION_SEQ_BYTES);
private static final KeyValue APPEND_ONLY_SCHEMA_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, APPEND_ONLY_SCHEMA_BYTES);
+ private static final KeyValue STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES);
+ private static final KeyValue QUALIIFIER_COUNTER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_COUNTER_BYTES);
private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
EMPTY_KEYVALUE_KV,
@@ -304,7 +313,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
UPDATE_CACHE_FREQUENCY_KV,
IS_NAMESPACE_MAPPED_KV,
AUTO_PARTITION_SEQ_KV,
- APPEND_ONLY_SCHEMA_KV
+ APPEND_ONLY_SCHEMA_KV,
+ STORAGE_SCHEME_KV
);
static {
Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -334,6 +344,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static final int IS_NAMESPACE_MAPPED_INDEX = TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV);
private static final int AUTO_PARTITION_SEQ_INDEX = TABLE_KV_COLUMNS.indexOf(AUTO_PARTITION_SEQ_KV);
private static final int APPEND_ONLY_SCHEMA_INDEX = TABLE_KV_COLUMNS.indexOf(APPEND_ONLY_SCHEMA_KV);
+ private static final int STORAGE_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV);
// KeyValues for Column
private static final KeyValue DECIMAL_DIGITS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
@@ -347,6 +358,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static final KeyValue IS_VIEW_REFERENCED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES);
private static final KeyValue COLUMN_DEF_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES);
private static final KeyValue IS_ROW_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ROW_TIMESTAMP_BYTES);
+ private static final KeyValue ENCODED_COLUMN_QUALIFIER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ENCODED_COLUMN_QUALIFIER_BYTES);
private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList(
DECIMAL_DIGITS_KV,
COLUMN_SIZE_KV,
@@ -359,7 +371,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
VIEW_CONSTANT_KV,
IS_VIEW_REFERENCED_KV,
COLUMN_DEF_KV,
- IS_ROW_TIMESTAMP_KV
+ IS_ROW_TIMESTAMP_KV,
+ ENCODED_COLUMN_QUALIFIER_KV
);
static {
Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -375,9 +388,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static final int IS_VIEW_REFERENCED_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV);
private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
private static final int IS_ROW_TIMESTAMP_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV);
+ private static final int ENCODED_COLUMN_QUALIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(ENCODED_COLUMN_QUALIFIER_KV);
private static final int LINK_TYPE_INDEX = 0;
-
+
private static final KeyValue CLASS_NAME_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, CLASS_NAME_BYTES);
private static final KeyValue JAR_PATH_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, JAR_PATH_BYTES);
private static final KeyValue RETURN_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, RETURN_TYPE_BYTES);
@@ -713,8 +727,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(),
isRowTimestampKV.getValueLength()));
+ Cell columnQualifierKV = colKeyValues[ENCODED_COLUMN_QUALIFIER_INDEX];
+ Integer columnQualifier = columnQualifierKV == null ? null :
+ PInteger.INSTANCE.getCodec().decodeInt(columnQualifierKV.getValueArray(), columnQualifierKV.getValueOffset(), SortOrder.getDefault());
- PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false);
+ PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifier);
columns.add(column);
}
@@ -922,11 +939,15 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false
: Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(),
isAppendOnlySchemaKv.getValueOffset(), isAppendOnlySchemaKv.getValueLength()));
-
-
+ Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX];
+ //TODO: change this once we start having other values for storage schemes
+ StorageScheme storageScheme = storageSchemeKv == null ? StorageScheme.NON_ENCODED_COLUMN_NAMES : StorageScheme
+ .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(),
+ storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength()));
List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
List<PTable> indexes = new ArrayList<PTable>();
List<PName> physicalTables = new ArrayList<PName>();
+ int counter = 0;
while (true) {
results.clear();
scanner.next(results);
@@ -934,29 +955,41 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
break;
}
Cell colKv = results.get(LINK_TYPE_INDEX);
- int colKeyLength = colKv.getRowLength();
- PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset);
- int colKeyOffset = offset + colName.getBytes().length + 1;
- PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
- if (colName.getString().isEmpty() && famName != null) {
- LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
- if (linkType == LinkType.INDEX_TABLE) {
- addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
- } else if (linkType == LinkType.PHYSICAL_TABLE) {
- physicalTables.add(famName);
+ if (colKv != null) {
+ int colKeyLength = colKv.getRowLength();
+ PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset);
+ int colKeyOffset = offset + colName.getBytes().length + 1;
+ PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
+ if (colName.getString().isEmpty() && famName != null) {
+ if (isQualifierCounterKv(colKv)) {
+ counter = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(),
+ colKv.getValueOffset(), SortOrder.ASC);
+ } else {
+ LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
+ if (linkType == LinkType.INDEX_TABLE) {
+ addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
+ } else if (linkType == LinkType.PHYSICAL_TABLE) {
+ physicalTables.add(famName);
+ }
+ }
+ } else {
+ addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null);
}
- } else {
- addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null);
- }
+ }
}
// Avoid querying the stats table because we're holding the rowLock here. Issuing an RPC to a remote
// server while holding this lock is a bad idea and likely to cause contention.
+ EncodedCQCounter cqCounter = (storageScheme == StorageScheme.NON_ENCODED_COLUMN_NAMES || tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER : new EncodedCQCounter(counter);
+ PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getPhysicalTableName(
+ Bytes.toBytes(SchemaUtil.getTableName(schemaName.getBytes(), tableName.getBytes())), isNamespaceMapped)
+ .getNameAsString()) : physicalTables.get(0);
+
return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum,
pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null,
tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName,
viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType,
rowKeyOrderOptimizable, transactional, updateCacheFrequency, baseColumnCount,
- indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema);
+ indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, cqCounter);
}
private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) throws IOException, SQLException {
@@ -978,6 +1011,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return new PSchema(schemaName.getString(), timeStamp);
}
+ private static boolean isQualifierCounterKv(Cell colKv) {
+ return Bytes.compareTo(colKv.getQualifierArray(), colKv.getQualifierOffset(),
+ colKv.getQualifierLength(), QUALIIFIER_COUNTER_KV.getQualifierArray(),
+ QUALIIFIER_COUNTER_KV.getQualifierOffset(), QUALIIFIER_COUNTER_KV.getQualifierLength()) == 0;
+ }
+
private PFunction getFunction(RegionScanner scanner, final boolean isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace)
throws IOException, SQLException {
List<Cell> results = Lists.newArrayList();
@@ -1928,7 +1967,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return result;
}
region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet());
- // Invalidate from cache
+ // Invalidate from cache. //TODO: samarth should we invalidate the base table from the cache too here.
for (ImmutableBytesPtr invalidateKey : invalidateList) {
metaDataCache.invalidate(invalidateKey);
}
@@ -2101,6 +2140,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[][] rkmd = new byte[5][];
int pkCount = getVarChars(m.getRow(), rkmd);
if (pkCount > COLUMN_NAME_INDEX
+ && rkmd[COLUMN_NAME_INDEX] != null && rkmd[COLUMN_NAME_INDEX].length > 0
&& Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
&& Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
columnPutsForBaseTable.add(new PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES)));
@@ -2135,8 +2175,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes.toString(rkmd[FAMILY_NAME_INDEX]);
try {
- existingViewColumn = columnFamily == null ? view.getColumn(columnName) : view.getColumnFamily(
- columnFamily).getColumn(columnName);
+ existingViewColumn = columnFamily == null ? view.getPColumnForColumnName(columnName) : view.getColumnFamily(
+ columnFamily).getPColumnForColumnName(columnName);
} catch (ColumnFamilyNotFoundException e) {
// ignore since it means that the column family is not present for the column to be added.
} catch (ColumnNotFoundException e) {
@@ -2263,26 +2303,26 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
columnsAddedToBaseTable++;
}
}
- /*
- * Allow adding a pk columns to base table : 1. if all the view pk columns are exactly the same as the base
- * table pk columns 2. if we are adding all the existing view pk columns to the base table
- */
- if (addingExistingPkCol && !viewPkCols.equals(basePhysicalTable.getPKColumns())) {
- return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
- }
- addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view,
- deltaNumPkColsSoFar);
-
- /*
- * Increment the sequence number by 1 if:
- * 1) For a diverged view, there were columns (pk columns) added to the view.
- * 2) For a non-diverged view if the base column count changed.
- */
- boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0)
- || (!isDivergedView(view) && columnsAddedToBaseTable > 0);
- updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews,
- invalidateList, clientTimeStamp, columnsAddedToView, columnsAddedToBaseTable,
- viewKey, view, ordinalPositionList, numCols, changeSequenceNumber);
+ /*
+ * Allow adding a pk columns to base table : 1. if all the view pk columns are exactly the same as the base
+ * table pk columns 2. if we are adding all the existing view pk columns to the base table
+ */
+ if (addingExistingPkCol && !viewPkCols.equals(basePhysicalTable.getPKColumns())) {
+ return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
+ }
+ addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view,
+ deltaNumPkColsSoFar);
+
+ /*
+ * Increment the sequence number by 1 if:
+ * 1) For a diverged view, there were columns (pk columns) added to the view.
+ * 2) For a non-diverged view if the base column count changed.
+ */
+ boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0)
+ || (!isDivergedView(view) && columnsAddedToBaseTable > 0);
+ updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews,
+ invalidateList, clientTimeStamp, columnsAddedToView, columnsAddedToBaseTable,
+ viewKey, view, ordinalPositionList, numCols, changeSequenceNumber);
}
return null;
}
@@ -2440,8 +2480,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily);
try {
existingViewColumn =
- columnFamily == null ? view.getColumn(columnName) : view
- .getColumnFamily(columnFamily).getColumn(columnName);
+ columnFamily == null ? view.getPColumnForColumnName(columnName) : view
+ .getColumnFamily(columnFamily).getPColumnForColumnName(columnName);
} catch (ColumnFamilyNotFoundException e) {
// ignore since it means that the column family is not present for the column to
// be added.
@@ -2507,7 +2547,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private MetaDataMutationResult validateColumnForAddToBaseTable(PColumn existingViewColumn, Put columnToBeAdded, PTable basePhysicalTable, boolean isColumnToBeAddPkCol, PTable view) {
if (existingViewColumn != null) {
-
+ if (EncodedColumnsUtil.usesEncodedColumnNames(basePhysicalTable) && !SchemaUtil.isPKColumn(existingViewColumn)) {
+ return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
+ }
// Validate data type is same
int baseColumnDataType = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
if (baseColumnDataType != existingViewColumn.getDataType().getSqlType()) {
@@ -2750,7 +2792,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
&& rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
PColumnFamily family =
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
- family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+ family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
} else if (pkCount > COLUMN_NAME_INDEX
&& rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
addingPKColumn = true;
@@ -3003,7 +3045,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
PColumnFamily family =
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
columnToDelete =
- family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+ family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
} else if (pkCount > COLUMN_NAME_INDEX
&& rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
deletePKColumn = true;
@@ -3092,10 +3134,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] indexKey =
SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index
.getTableName().getBytes());
+ byte[] cq = EncodedColumnsUtil.getColumnQualifier(columnToDelete, index);
+ ColumnReference colRef = new ColumnReference(columnToDelete.getFamilyName().getBytes(), cq);
// If index requires this column for its pk, then drop it
- if (indexMaintainer.getIndexedColumns().contains(
- new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete
- .getName().getBytes()))) {
+ if (indexMaintainer.getIndexedColumns().contains(colRef)) {
// Since we're dropping the index, lock it to ensure
// that a change in index state doesn't
// occur while we're dropping it.
@@ -3116,9 +3158,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
invalidateList.add(new ImmutableBytesPtr(indexKey));
}
// If the dropped column is a covered index column, invalidate the index
- else if (indexMaintainer.getCoveredColumns().contains(
- new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete
- .getName().getBytes()))) {
+ else if (indexMaintainer.getCoveredColumns().contains(colRef)){
invalidateList.add(new ImmutableBytesPtr(indexKey));
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 457555e..accaaa9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
@@ -64,12 +63,11 @@ import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
+import org.apache.tephra.Transaction;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.apache.tephra.Transaction;
-
/**
*
@@ -108,7 +106,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
}
}
- public static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) {
+ private static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s, boolean isJoin) {
byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
if (topN == null) {
return null;
@@ -126,7 +124,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
orderByExpression.readFields(input);
orderByExpressions.add(orderByExpression);
}
- ResultIterator inner = new RegionScannerResultIterator(s);
+ ResultIterator inner = new RegionScannerResultIterator(s, ScanUtil.getMinMaxQualifiersFromScan(scan), isJoin);
return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null,
estimatedRowSize);
} catch (IOException e) {
@@ -219,10 +217,12 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ //TODO: samarth get rid of this join shit. Joins should support position based look up.
+ boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != null) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
innerScanner =
getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan,
dataColumns, tupleProjector, dataRegion, indexMaintainer, tx,
- viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr);
+ viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr, useQualifierAsIndex);
final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
if (j != null) {
@@ -230,10 +230,10 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
}
if (scanOffset != null) {
innerScanner = getOffsetScanner(c, innerScanner,
- new OffsetResultIterator(new RegionScannerResultIterator(innerScanner), scanOffset),
+ new OffsetResultIterator(new RegionScannerResultIterator(innerScanner, ScanUtil.getMinMaxQualifiersFromScan(scan), j != null), scanOffset),
scan.getAttribute(QueryConstants.LAST_SCAN) != null);
}
- final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner);
+ final OrderedResultIterator iterator = deserializeFromScan(scan, innerScanner, j != null);
if (iterator == null) {
return innerScanner;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 2e2d580..89ccff0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -80,6 +80,7 @@ public class SequenceRegionObserver extends BaseRegionObserver {
public static final String NUM_TO_ALLOCATE = "NUM_TO_ALLOCATE";
private static final byte[] SUCCESS_VALUE = PInteger.INSTANCE.toBytes(Integer.valueOf(Sequence.SUCCESS));
+ //TODO: samarth verify that it is ok to send non-encoded empty column here. Probably is.
private static Result getErrorResult(byte[] row, long timestamp, int errorCode) {
byte[] errorCodeBuf = new byte[PInteger.INSTANCE.getByteSize()];
PInteger.INSTANCE.getCodec().encodeInt(errorCode, errorCodeBuf, 0);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a312020..c1ef0b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -24,6 +24,7 @@ import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT;
import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT;
+import static org.apache.phoenix.util.ScanUtil.getMinMaxQualifiersFromScan;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -49,10 +50,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
@@ -64,7 +63,6 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
@@ -98,7 +96,10 @@ import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
import org.apache.phoenix.schema.stats.StatisticsCollector;
import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDataType;
@@ -114,6 +115,7 @@ import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TimeKeeper;
+import org.apache.tephra.TxConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,8 +123,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.apache.tephra.TxConstants;
-
/**
* Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY).
@@ -300,6 +300,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
byte[] deleteCQ = null;
byte[] deleteCF = null;
byte[] emptyCF = null;
+ byte[] emptyKVQualifier = null;
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
if (upsertSelectTable != null) {
isUpsert = true;
@@ -315,12 +316,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
}
emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
+ emptyKVQualifier = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER);//TODO: samarth check this
}
TupleProjector tupleProjector = null;
byte[][] viewConstants = null;
ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != null) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j == null && p != null)) {
if (dataColumns != null) {
tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
@@ -329,7 +332,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
theScanner =
getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector,
- c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
+ c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
}
if (j != null) {
@@ -369,7 +372,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
Aggregator[] rowAggregators = aggregators.getAggregators();
boolean hasMore;
boolean hasAny = false;
- MultiKeyValueTuple result = new MultiKeyValueTuple();
+ Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan);
+ Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan)));
}
@@ -386,7 +390,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
acquiredLock = true;
synchronized (innerScanner) {
do {
- List<Cell> results = new ArrayList<Cell>();
+ List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>();
// Results are potentially returned even when the return value of s.next is false
// since this is an indication of whether or not there are more values after the
// ones returned
@@ -589,8 +593,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
if (!timeStamps.contains(kvts)) {
Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
kv.getRowLength());
- put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
- ByteUtil.EMPTY_BYTE_ARRAY);
+ // FIXME: Use the right byte array value. Transactional tables can't
+ // have empty byte arrays since Tephra seems them as delete markers.
+ put.add(emptyCF, emptyKVQualifier != null ? emptyKVQualifier
+ : QueryConstants.EMPTY_COLUMN_BYTES, kvts, ByteUtil.EMPTY_BYTE_ARRAY);
mutations.add(put);
}
}