You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/08/08 01:42:07 UTC
[08/11] phoenix git commit: Encode column names and take advantage of
encoding in group by and order by
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
index 4b5fdbb..35862c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
@@ -28,6 +28,7 @@ import org.apache.phoenix.expression.visitor.ExpressionVisitor;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PDatum;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -41,13 +42,13 @@ import org.apache.phoenix.util.SchemaUtil;
public class KeyValueColumnExpression extends ColumnExpression {
private byte[] cf;
private byte[] cq;
- private String displayName; // client-side only
+ private String displayName; // client-side only. TODO: samarth see what can you do for encoded column names.
public KeyValueColumnExpression() {
}
- public KeyValueColumnExpression(PColumn column) {
- this(column, null);
+ public KeyValueColumnExpression(PColumn column, boolean encodedColumnName) {
+ this(column, null, encodedColumnName);
}
public KeyValueColumnExpression(PDatum column, byte[] cf, byte[] cq) {
@@ -56,18 +57,19 @@ public class KeyValueColumnExpression extends ColumnExpression {
this.cq = cq;
}
- public KeyValueColumnExpression(PColumn column, String displayName) {
+ public KeyValueColumnExpression(PColumn column, String displayName, boolean encodedColumnName) {
super(column);
this.cf = column.getFamilyName().getBytes();
- this.cq = column.getName().getBytes();
+ this.cq = EncodedColumnsUtil.getColumnQualifier(column, encodedColumnName);
this.displayName = displayName;
}
public byte[] getColumnFamily() {
return cf;
}
-
- public byte[] getColumnName() {
+
+ //TODO: samarth look for the callers of this.
+ public byte[] getColumnQualifier() {
return cq;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
index 3a38dee..2744f35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
@@ -154,6 +154,7 @@ public class ProjectedColumnExpression extends ColumnExpression {
return Determinism.PER_INVOCATION;
}
+ @Override
public ProjectedColumnExpression clone() {
return new ProjectedColumnExpression(this.column, this.columns, this.position, this.displayName);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
index b8b0350..d1f6211 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.EncodedColumnsUtil;
/**
* When selecting specific columns in a SELECT query, this filter passes only selected columns
@@ -53,6 +53,8 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
private byte[] emptyCFName;
private Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker;
private Set<byte[]> conditionOnlyCfs;
+ private boolean usesEncodedColumnNames;
+ private byte[] emptyKVQualifier;
public ColumnProjectionFilter() {
@@ -60,10 +62,12 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
public ColumnProjectionFilter(byte[] emptyCFName,
Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker,
- Set<byte[]> conditionOnlyCfs) {
+ Set<byte[]> conditionOnlyCfs, boolean usesEncodedColumnNames) {
this.emptyCFName = emptyCFName;
this.columnsTracker = columnsTracker;
this.conditionOnlyCfs = conditionOnlyCfs;
+ this.usesEncodedColumnNames = usesEncodedColumnNames;
+ this.emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
}
@Override
@@ -87,6 +91,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
familyMapSize--;
}
int conditionOnlyCfsSize = WritableUtils.readVInt(input);
+ usesEncodedColumnNames = conditionOnlyCfsSize > 0;
+ emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
+ conditionOnlyCfsSize = Math.abs(conditionOnlyCfsSize) - 1; // restore to the actual value.
this.conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
while (conditionOnlyCfsSize > 0) {
this.conditionOnlyCfs.add(WritableUtils.readCompressedByteArray(input));
@@ -110,12 +117,13 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
}
}
}
- // Write conditionOnlyCfs
- WritableUtils.writeVInt(output, this.conditionOnlyCfs.size());
+ // Encode usesEncodedColumnNames in conditionOnlyCfs size.
+ WritableUtils.writeVInt(output, (this.conditionOnlyCfs.size() + 1) * (usesEncodedColumnNames ? 1 : -1));
for (byte[] f : this.conditionOnlyCfs) {
WritableUtils.writeCompressedByteArray(output, f);
}
- }
+
+}
@Override
public byte[] toByteArray() throws IOException {
@@ -153,9 +161,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
// make sure we're not holding to any of the byte[]'s
ptr.set(HConstants.EMPTY_BYTE_ARRAY);
if (kvs.isEmpty()) {
- kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(),firstKV.getRowLength(), this.emptyCFName,
- 0, this.emptyCFName.length, QueryConstants.EMPTY_COLUMN_BYTES, 0,
- QueryConstants.EMPTY_COLUMN_BYTES.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0));
+ kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength(),
+ this.emptyCFName, 0, this.emptyCFName.length, emptyKVQualifier, 0,
+ emptyKVQualifier.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0));
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
index dba700b..c3d52a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
@@ -94,7 +94,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
refCount = foundColumns.size();
}
- public ReturnCode resolveColumn(Cell value) {
+ private ReturnCode resolveColumn(Cell value) {
// Always set key, in case we never find a key value column of interest,
// and our expression uses row key columns.
setKey(value);
@@ -184,7 +184,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() {
@Override
public Void visit(KeyValueColumnExpression expression) {
- inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnName());
+ inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnQualifier());
return null;
}
};
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
index eaf8d35..07f7072 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
@@ -58,7 +58,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi
@Override
public Void visit(KeyValueColumnExpression expression) {
cf = expression.getColumnFamily();
- cq = expression.getColumnName();
+ cq = expression.getColumnQualifier();
return null;
}
};
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 6f9caa6..0f960e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -32,7 +32,6 @@ import org.apache.phoenix.hbase.index.covered.TableState;
import org.apache.phoenix.hbase.index.scanner.Scanner;
import com.google.common.collect.Lists;
-import com.google.common.collect.Lists;
/**
*
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index ee67ca2..2ad0c8d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -32,8 +32,10 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
@@ -85,6 +87,7 @@ import org.apache.phoenix.schema.tuple.ValueGetterTuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.BitSet;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -275,8 +278,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
// columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key)
private Set<ColumnReference> indexedColumns;
private Set<ColumnReference> coveredColumns;
- // Map used to cache column family of data table and the corresponding column family for the local index
- private Map<ImmutableBytesPtr, ImmutableBytesWritable> dataTableLocalIndexFamilyMap;
+ // Map of covered columns where a key is column reference for a column in the data table
+ // and value is column reference for corresponding column in the index table.
+ // TODO: samarth confirm that we don't need a separate map for tracking column families of local indexes.
+ private Map<ColumnReference, ColumnReference> coveredColumnsMap;
// columns required to create index row i.e. indexedColumns + coveredColumns (this does not include columns in the data row key)
private Set<ColumnReference> allColumns;
// TODO remove this in the next major release
@@ -295,13 +300,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
private final boolean isDataTableSalted;
private final RowKeySchema dataRowKeySchema;
- private List<ImmutableBytesPtr> indexQualifiers;
private int estimatedIndexRowKeyBytes;
private int estimatedExpressionSize;
private int[] dataPkPosition;
private int maxTrailingNulls;
private ColumnReference dataEmptyKeyValueRef;
private boolean rowKeyOrderOptimizable;
+ private boolean usesEncodedColumnNames;
+ private ImmutableBytesPtr emptyKeyValueQualifierPtr;
private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
this.dataRowKeySchema = dataRowKeySchema;
@@ -315,14 +321,19 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
this.isMultiTenant = dataTable.isMultiTenant();
this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
-
+ /*
+ * There is nothing to prevent new indexes on existing tables to have encoded column names.
+ * Except, due to backward compatibility reasons, we aren't able to change IndexMaintainer and the state
+ * that is serialized in it. Because of this we are forced to have the indexes inherit the
+ * storage scheme of the parent data tables.
+ */
+ this.usesEncodedColumnNames = EncodedColumnsUtil.usesEncodedColumnNames(dataTable);
byte[] indexTableName = index.getPhysicalName().getBytes();
// Use this for the nDataSaltBuckets as we need this for local indexes
// TODO: persist nDataSaltBuckets separately, but maintain b/w compat.
Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : index.getBucketNum();
boolean indexWALDisabled = index.isWALDisabled();
int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + (this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1);
-// int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0;
int nIndexColumns = index.getColumns().size() - indexPosOffset;
int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset;
// number of expressions that are indexed that are not present in the row key of the data table
@@ -333,7 +344,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColumnName);
String dataColumnName = IndexUtil.getDataColumnName(indexColumnName);
try {
- PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumn(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName);
+ PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getPColumnForColumnName(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName);
if (SchemaUtil.isPKColumn(dataColumn))
continue;
} catch (ColumnNotFoundException e) {
@@ -366,7 +377,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns);
- this.dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns);
+ this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns - nIndexPKColumns);
this.nIndexSaltBuckets = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets;
this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable);
this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index);
@@ -408,7 +419,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
throw new RuntimeException(e); // Impossible
}
if ( expressionIndexCompiler.getColumnRef()!=null ) {
- // get the column of the data table that corresponds to this index column
+ // get the column of the data column that corresponds to this index column
PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
boolean isPKColumn = SchemaUtil.isPKColumn(column);
if (isPKColumn) {
@@ -432,12 +443,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
for (int i = 0; i < index.getColumnFamilies().size(); i++) {
PColumnFamily family = index.getColumnFamilies().get(i);
for (PColumn indexColumn : family.getColumns()) {
- PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
- PName dataTableFamily = column.getFamilyName();
- this.coveredColumns.add(new ColumnReference(dataTableFamily.getBytes(), column.getName().getBytes()));
- if(isLocalIndex) {
- this.dataTableLocalIndexFamilyMap.put(new ImmutableBytesPtr(dataTableFamily.getBytes()), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(dataTableFamily.getString()))));
- }
+ PColumn dataColumn = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
+ byte[] dataColumnCq = EncodedColumnsUtil.getColumnQualifier(dataColumn, dataTable);
+ byte[] indexColumnCq = EncodedColumnsUtil.getColumnQualifier(indexColumn, index);
+ this.coveredColumns.add(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq));
+ this.coveredColumnsMap.put(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq),
+ new ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq));
}
}
this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize);
@@ -853,14 +864,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
put = new Put(indexRowKey);
// add the keyvalue for the empty row
put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
- this.getEmptyKeyValueFamily(), QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
+ this.getEmptyKeyValueFamily(), emptyKeyValueQualifierPtr, ts,
// set the value to the empty column name
- QueryConstants.EMPTY_COLUMN_BYTES_PTR));
+ emptyKeyValueQualifierPtr));
put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
- int i = 0;
for (ColumnReference ref : this.getCoveredColumns()) {
- ImmutableBytesPtr cq = this.indexQualifiers.get(i++);
+ //FIXME: samarth figure out a backward compatible way to handle this as coveredcolumnsmap won't be availble for older phoenix clients.
+ ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+ ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
+ ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey);
ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
@@ -870,12 +883,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
//this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC
- if(this.isLocalIndex) {
- ImmutableBytesWritable localIndexColFamily = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable());
- put.add(kvBuilder.buildPut(rowKey, localIndexColFamily, cq, ts, value));
- } else {
- put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value));
- }
+ put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
}
}
return put;
@@ -963,14 +971,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
for (ColumnReference ref : getCoveredColumns()) {
byte[] family = ref.getFamily();
- if (this.isLocalIndex) {
- family = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get();
- }
+ ColumnReference indexColumn = coveredColumnsMap.get(ref);
// If table delete was single version, then index delete should be as well
if (deleteType == DeleteType.SINGLE_VERSION) {
- delete.deleteFamilyVersion(family, ts);
+ delete.deleteFamilyVersion(indexColumn.getFamily(), ts);
} else {
- delete.deleteFamily(family, ts);
+ delete.deleteFamily(indexColumn.getFamily(), ts);
}
}
if (deleteType == DeleteType.SINGLE_VERSION) {
@@ -991,12 +997,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
delete = new Delete(indexRowKey);
delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
- byte[] family = this.isLocalIndex ? this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get() : ref.getFamily();
+ ColumnReference indexColumn = coveredColumnsMap.get(ref);
// If point delete for data table, then use point delete for index as well
- if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
- delete.deleteColumn(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+ if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
+ //FIXME: samarth change this. Index column qualifiers are not derivable from data table cqs.
+ // Figure out a backward compatible way of going this since coveredColumnsMap won't be available
+ // for older clients.
+ delete.deleteColumn(indexColumn.getFamily(), indexColumn.getQualifier(), ts);
} else {
- delete.deleteColumns(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+ delete.deleteColumns(indexColumn.getFamily(), indexColumn.getQualifier(), ts);
}
}
}
@@ -1051,15 +1060,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0;
int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1;
coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns);
- dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nCoveredColumns);
+ coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns);
for (int i = 0; i < nCoveredColumns; i++) {
- byte[] cf = Bytes.readByteArray(input);
- byte[] cq = Bytes.readByteArray(input);
- ColumnReference ref = new ColumnReference(cf,cq);
- coveredColumns.add(ref);
- if(isLocalIndex) {
- dataTableLocalIndexFamilyMap.put(ref.getFamilyWritable(), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf)))));
- }
+ byte[] dataTableCf = Bytes.readByteArray(input);
+ byte[] dataTableCq = Bytes.readByteArray(input);
+ byte[] indexTableCf = Bytes.readByteArray(input);
+ byte[] indexTableCq = Bytes.readByteArray(input);
+ ColumnReference dataColumn = new ColumnReference(dataTableCf, dataTableCq);
+ coveredColumns.add(dataColumn);
+ ColumnReference indexColumn = new ColumnReference(indexTableCf, indexTableCq);
+ coveredColumnsMap.put(dataColumn, indexColumn);
}
// Hack to serialize whether the index row key is optimizable
int len = WritableUtils.readVInt(input);
@@ -1085,6 +1095,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
if (isNewClient) {
int numIndexedExpressions = WritableUtils.readVInt(input);
+ usesEncodedColumnNames = numIndexedExpressions > 0;
+ numIndexedExpressions = Math.abs(numIndexedExpressions) - 1;
indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions);
for (int i = 0; i < numIndexedExpressions; i++) {
Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
@@ -1161,9 +1173,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
}
// Encode coveredColumns.size() and whether or not this is a local index
WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * (isLocalIndex ? -1 : 1));
- for (ColumnReference ref : coveredColumns) {
- Bytes.writeByteArray(output, ref.getFamily());
- Bytes.writeByteArray(output, ref.getQualifier());
+ for (Entry<ColumnReference, ColumnReference> ref : coveredColumnsMap.entrySet()) {
+ ColumnReference dataColumn = ref.getKey();
+ ColumnReference indexColumn = ref.getValue();
+ Bytes.writeByteArray(output, dataColumn.getFamily());
+ Bytes.writeByteArray(output, dataColumn.getQualifier());
+ Bytes.writeByteArray(output, indexColumn.getFamily());
+ Bytes.writeByteArray(output, indexColumn.getQualifier());
}
// TODO: remove when rowKeyOrderOptimizable hack no longer needed
WritableUtils.writeVInt(output,indexTableName.length * (rowKeyOrderOptimizable ? 1 : -1));
@@ -1174,7 +1190,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength());
output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), emptyKeyValueCFPtr.getLength());
- WritableUtils.writeVInt(output, indexedExpressions.size());
+ // Hack to encode usesEncodedColumnNames in indexedExpressions size.
+ int indexedExpressionsSize = (indexedExpressions.size() + 1) * (usesEncodedColumnNames ? 1 : -1);
+ WritableUtils.writeVInt(output, indexedExpressionsSize);
for (Expression expression : indexedExpressions) {
WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
expression.write(output);
@@ -1231,16 +1249,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
* Init calculated state reading/creating
*/
private void initCachedState() {
- dataEmptyKeyValueRef =
- new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(),
- QueryConstants.EMPTY_COLUMN_BYTES);
-
- indexQualifiers = Lists.newArrayListWithExpectedSize(this.coveredColumns.size());
- for (ColumnReference ref : coveredColumns) {
- indexQualifiers.add(new ImmutableBytesPtr(IndexUtil.getIndexColumnName(
- ref.getFamily(), ref.getQualifier())));
- }
-
+ byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
+ dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier);
+ emptyKeyValueQualifierPtr = new ImmutableBytesPtr(emptyKvQualifier);
this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumns.size());
// columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key)
this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());
@@ -1248,7 +1259,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() {
@Override
public Void visit(KeyValueColumnExpression expression) {
- if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) {
+ if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()))) {
indexedColumnTypes.add(expression.getDataType());
}
return null;
@@ -1513,4 +1524,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
return udfParseNodes;
}
}
+
+ public byte[] getEmptyKeyValueQualifier() {
+ return emptyKeyValueQualifierPtr.copyBytes();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index c67da6e..9ee5ea7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -67,7 +67,6 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
@@ -304,8 +303,16 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
for (ColumnReference ref : mutableColumns) {
scan.addColumn(ref.getFamily(), ref.getQualifier());
}
+ /*
+ * Indexes inherit the storage scheme of the data table which means all the indexes have the same
+ * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start
+ * supporting new indexes over existing data tables to have a different storage scheme than the data
+ * table.
+ */
+ byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier();
+
// Project empty key value column
- scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+ scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
scanRanges.initializeScan(scan);
TableName tableName = env.getRegion().getRegionInfo().getTable();
@@ -356,7 +363,8 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException {
if (scanner != null) {
Result result;
- ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+ ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0)
+ .getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
// Process existing data table rows by removing the old index row and adding the new index row
while ((result = scanner.next()) != null) {
Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
@@ -384,7 +392,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
// to generate point delete markers for all index rows that were added. We don't have Tephra
// manage index rows in change sets because we don't want to be hit with the additional
// memory hit and do not need to do conflict detection on index rows.
- ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+ ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
while ((result = scanner.next()) != null) {
Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
// Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index ceba000..496b3b0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,12 +17,16 @@
*/
package org.apache.phoenix.iterate;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.util.ScanUtil.setMinMaxQualifiersOnScan;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
@@ -48,6 +52,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HConstants;
+import javax.management.Query;
+
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@@ -87,8 +93,10 @@ import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.PTableStats;
import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PrefixByteCodec;
import org.apache.phoenix.util.PrefixByteDecoder;
@@ -207,7 +215,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// Project empty key value unless the column family containing it has
// been projected in its entirety.
if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
- scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+ scan.addColumn(ecf, EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst());
}
}
}
@@ -225,7 +233,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
if(offset!=null){
ScanUtil.addOffsetAttribute(scan, offset);
}
-
int cols = plan.getGroupBy().getOrderPreservingColumnCount();
if (cols > 0 && context.getWhereConditionColumns().size() == 0 &&
!plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) &&
@@ -237,13 +244,76 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
new DistinctPrefixFilter(plan.getTableRef().getTable().getRowKeySchema(),
cols));
}
-
+ //TODO: samarth add condition to not do position based look ups in case of joins so that we won't need to do the hacky check inside co-processors.
+ if (setMinMaxQualifiersOnScan(table)) {
+ Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiers(scan, context);
+ if (minMaxQualifiers != null) {
+ scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, PInteger.INSTANCE.toBytes(minMaxQualifiers.getFirst()));
+ scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, PInteger.INSTANCE.toBytes(minMaxQualifiers.getSecond()));
+ }
+ }
if (optimizeProjection) {
optimizeProjection(context, scan, table, statement);
}
}
}
-
+
+ private static Pair<Integer, Integer> getMinMaxQualifiers(Scan scan, StatementContext context) {
+ PTable table = context.getCurrentTable().getTable();
+ checkArgument(EncodedColumnsUtil.usesEncodedColumnNames(table), "Method should only be used for tables using encoded column names");
+ Integer minQualifier = null;
+ Integer maxQualifier = null;
+ boolean emptyKVProjected = false;
+ for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
+ byte[] cq = whereCol.getSecond();
+ if (cq != null) {
+ int qualifier = (Integer)PInteger.INSTANCE.toObject(cq);
+ if (qualifier == ENCODED_EMPTY_COLUMN_NAME) {
+ emptyKVProjected = true;
+ continue;
+ }
+ if (minQualifier == null && maxQualifier == null) {
+ minQualifier = maxQualifier = qualifier;
+ } else {
+ if (qualifier < minQualifier) {
+ minQualifier = qualifier;
+ } else if (qualifier > maxQualifier) {
+ maxQualifier = qualifier;
+ }
+ }
+ }
+ }
+ Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+ for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+ if (entry.getValue() != null) {
+ for (byte[] cq : entry.getValue()) {
+ if (cq != null) {
+ int qualifier = (Integer)PInteger.INSTANCE.toObject(cq);
+ if (qualifier == ENCODED_EMPTY_COLUMN_NAME) {
+ emptyKVProjected = true;
+ continue;
+ }
+ if (minQualifier == null && maxQualifier == null) {
+ minQualifier = maxQualifier = qualifier;
+ } else {
+ if (qualifier < minQualifier) {
+ minQualifier = qualifier;
+ } else if (qualifier > maxQualifier) {
+ maxQualifier = qualifier;
+ }
+ }
+ }
+ }
+ }
+ }
+ if (minQualifier == null && emptyKVProjected) {
+ return new Pair<>(ENCODED_EMPTY_COLUMN_NAME, ENCODED_EMPTY_COLUMN_NAME);
+ } else if (minQualifier == null) {
+ return null;
+ }
+ return new Pair<>(minQualifier, maxQualifier);
+ }
+
private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
// columnsTracker contain cf -> qualifiers which should get returned.
@@ -340,7 +410,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// the ExplicitColumnTracker not to be used, though.
if (!statement.isAggregate() && filteredColumnNotInProjection) {
ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
- columnsTracker, conditionOnlyCfs));
+ columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table)));
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index 3293f65..1e5f09e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -49,7 +49,7 @@ abstract public class LookAheadResultIterator implements PeekingResultIterator {
};
}
- private final static Tuple UNINITIALIZED = new ResultTuple();
+ private final static Tuple UNINITIALIZED = ResultTuple.EMPTY_TUPLE;
private Tuple next = UNINITIALIZED;
abstract protected Tuple advance() throws SQLException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
index 8ada952..135ab26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
@@ -180,6 +180,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
return this.index;
}
+ @Override
public int size() {
if (flushBuffer)
return flushedCount;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 8dcb2e8..e4c52c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.SizedUtil;
import com.google.common.base.Function;
@@ -264,7 +265,7 @@ public class OrderedResultIterator implements PeekingResultIterator {
}
this.byteSize = queueEntries.getByteSize();
} catch (IOException e) {
- throw new SQLException("", e);
+ ServerUtil.createIOException(e.getMessage(), e);
} finally {
delegate.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index 88e141a..531bbe7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -24,16 +24,24 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
public class RegionScannerResultIterator extends BaseResultIterator {
private final RegionScanner scanner;
+ private final Pair<Integer, Integer> minMaxQualifiers;
+ private final boolean useQualifierAsIndex;
- public RegionScannerResultIterator(RegionScanner scanner) {
+ public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, Integer> minMaxQualifiers, boolean isJoin) {
this.scanner = scanner;
+ this.useQualifierAsIndex = ScanUtil.useQualifierAsIndex(minMaxQualifiers, isJoin);
+ this.minMaxQualifiers = minMaxQualifiers;
}
@Override
@@ -43,7 +51,7 @@ public class RegionScannerResultIterator extends BaseResultIterator {
synchronized (scanner) {
try {
// TODO: size
- List<Cell> results = new ArrayList<Cell>();
+ List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>();
// Results are potentially returned even when the return value of s.next is false
// since this is an indication of whether or not there are more values after the
// ones returned
@@ -53,7 +61,7 @@ public class RegionScannerResultIterator extends BaseResultIterator {
}
// We instantiate a new tuple because in all cases currently we hang on to it
// (i.e. to compute and hold onto the TopN).
- MultiKeyValueTuple tuple = new MultiKeyValueTuple();
+ Tuple tuple = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
tuple.setKeyValues(results);
return tuple;
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 2927de1..7da41c7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -108,9 +108,9 @@ import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.SchemaUtil;
import org.cloudera.htrace.Sampler;
import org.cloudera.htrace.TraceScope;
-import org.apache.phoenix.util.SchemaUtil;
import org.apache.tephra.TransactionContext;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 4fd4485..0e7db1c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -205,7 +205,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT);
public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP";
public static final byte[] IS_ROW_TIMESTAMP_BYTES = Bytes.toBytes(IS_ROW_TIMESTAMP);
-
+
public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY;
public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
@@ -315,6 +315,13 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
/** Version below which we fall back on the generic KeyValueBuilder */
public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14");
+ public static final String STORAGE_SCHEME = "STORAGE_SCHEME";
+ public static final byte[] STORAGE_SCHEME_BYTES = Bytes.toBytes(STORAGE_SCHEME);
+ public static final String ENCODED_COLUMN_QUALIFIER = "COLUMN_QUALIFIER";
+ public static final byte[] ENCODED_COLUMN_QUALIFIER_BYTES = Bytes.toBytes(ENCODED_COLUMN_QUALIFIER);
+ public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER";
+ public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER_COUNTER);
+
PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
this.connection = connection;
@@ -588,9 +595,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
newCells.addAll(cells);
newCells.add(kv);
Collections.sort(newCells, KeyValue.COMPARATOR);
- resultTuple.setResult(Result.create(newCells));
+ tuple = new ResultTuple(Result.create(newCells));
}
-
return tuple;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 47c17ae..3ca48a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -107,7 +107,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
private final static String STRING_FALSE = "0";
private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0);
private final static Integer INTEGER_FALSE = Integer.valueOf(0);
- private final static Tuple BEFORE_FIRST = new ResultTuple();
+ private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE;
private final ResultIterator scanner;
private final RowProjector rowProjector;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 908a117..2d7550a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -122,6 +122,7 @@ public class HashCacheFactory implements ServerCacheFactory {
int resultSize = (int)Bytes.readVLong(hashCacheByteArray, offset);
offset += WritableUtils.decodeVIntSize(hashCacheByteArray[offset]);
ImmutableBytesWritable value = new ImmutableBytesWritable(hashCacheByteArray,offset,resultSize);
+ //TODO: samarth make joins work with position look up.
Tuple result = new ResultTuple(ResultUtil.toResult(value));
ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(result, onExpressions);
List<Tuple> tuples = hashCacheMap.get(key);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index cacbce7..d94fa42 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
+
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
@@ -48,6 +49,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -207,7 +209,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
not care about it
*/
private void initColumnIndexes() throws SQLException {
- columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR);
+ columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
int columnIndex = 0;
for(int index = 0; index < logicalNames.size(); index++) {
PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
@@ -215,18 +217,23 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
byte[] family = new byte[0];
- if (c.getFamilyName() != null) // Skip PK column
+ byte[] cq;
+ if (!SchemaUtil.isPKColumn(c)) {
family = c.getFamilyName().getBytes();
- byte[] name = c.getName().getBytes();
- byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+ cq = EncodedColumnsUtil.getColumnQualifier(c, table);
+ } else {
+ // TODO: samarth verify if this is the right thing to do here.
+ cq = c.getName().getBytes();
+ }
+ byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
if (!columnIndexes.containsKey(cfn)) {
columnIndexes.put(cfn, new Integer(columnIndex));
columnIndex++;
}
}
byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
- byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES,
- QueryConstants.EMPTY_COLUMN_BYTES);
+ byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+ byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
columnIndexes.put(cfn, new Integer(columnIndex));
columnIndex++;
}
@@ -242,9 +249,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
private int findIndex(Cell cell) throws IOException {
byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength());
- byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
+ byte[] cq = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
- byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+ byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
if(columnIndexes.containsKey(cfn)) {
return columnIndexes.get(cfn);
}
@@ -397,4 +404,4 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
return keyValues;
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 15d6d2f..c5f690b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -89,7 +90,7 @@ public class FormatToKeyValueReducer
}
private void initColumnsMap(PhoenixConnection conn) throws SQLException {
- Map<byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
+ Map<byte[], Integer> indexMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
columnIndexes = new HashMap<>();
int columnIndex = 0;
for (int index = 0; index < logicalNames.size(); index++) {
@@ -98,12 +99,16 @@ public class FormatToKeyValueReducer
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
byte[] family = new byte[0];
- if (c.getFamilyName() != null) {
+ byte[] cq;
+ if (!SchemaUtil.isPKColumn(c)) {
family = c.getFamilyName().getBytes();
+ cq = EncodedColumnsUtil.getColumnQualifier(c, table);
+ } else {
+ // TODO: samarth verify if this is the right thing to do here.
+ cq = c.getName().getBytes();
}
- byte[] name = c.getName().getBytes();
- byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
- Pair<byte[], byte[]> pair = new Pair(family, name);
+ byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
+ Pair<byte[], byte[]> pair = new Pair<>(family, cq);
if (!indexMap.containsKey(cfn)) {
indexMap.put(cfn, new Integer(columnIndex));
columnIndexes.put(new Integer(columnIndex), pair);
@@ -111,8 +116,8 @@ public class FormatToKeyValueReducer
}
}
byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
- Pair<byte[], byte[]> pair = new Pair(emptyColumnFamily, QueryConstants
- .EMPTY_COLUMN_BYTES);
+ byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+ Pair<byte[], byte[]> pair = new Pair<>(emptyColumnFamily, emptyKeyValue);
columnIndexes.put(new Integer(columnIndex), pair);
columnIndex++;
}
@@ -123,18 +128,17 @@ public class FormatToKeyValueReducer
Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
throws IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
- ImmutableBytesWritable rowKey = key.getRowkey();
for (ImmutableBytesWritable aggregatedArray : values) {
DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
while (input.available() != 0) {
byte type = input.readByte();
int index = WritableUtils.readVInt(input);
ImmutableBytesWritable family;
- ImmutableBytesWritable name;
+ ImmutableBytesWritable cq;
ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
Pair<byte[], byte[]> pair = columnIndexes.get(index);
family = new ImmutableBytesWritable(pair.getFirst());
- name = new ImmutableBytesWritable(pair.getSecond());
+ cq = new ImmutableBytesWritable(pair.getSecond());
int len = WritableUtils.readVInt(input);
if (len > 0) {
byte[] array = new byte[len];
@@ -145,10 +149,10 @@ public class FormatToKeyValueReducer
KeyValue.Type kvType = KeyValue.Type.codeToType(type);
switch (kvType) {
case Put: // not null value
- kv = builder.buildPut(key.getRowkey(), family, name, value);
+ kv = builder.buildPut(key.getRowkey(), family, cq, value);
break;
case DeleteColumn: // null value
- kv = builder.buildDeleteColumns(key.getRowkey(), family, name);
+ kv = builder.buildDeleteColumns(key.getRowkey(), family, cq);
break;
default:
throw new IOException("Unsupported KeyValue type " + kvType);
@@ -164,4 +168,4 @@ public class FormatToKeyValueReducer
if (++index % 100 == 0) context.setStatus("Wrote " + index);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f14371d..ebaee7c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -570,6 +570,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
+ @Override
public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException {
synchronized (latestMetaDataLock) {
throwConnectionClosedIfNullMetaData();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 9f8f58c..b1733f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -31,6 +31,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
@@ -40,6 +41,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH;
@@ -85,6 +87,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SOURCE_DATA_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATA_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
@@ -118,7 +121,7 @@ import org.apache.phoenix.schema.MetaDataSplitPolicy;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.schema.types.PInteger;
/**
@@ -148,23 +151,30 @@ public interface QueryConstants {
public final static byte[] OFFSET_ROW_KEY_BYTES = Bytes.toBytes(OFFSET_ROW_KEY);
public final static ImmutableBytesPtr OFFSET_ROW_KEY_PTR = new ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES);
- public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s");
- public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s");
- public final static byte[] SINGLE_COLUMN = SINGLE_COLUMN_NAME.getBytes();
- public final static byte[] SINGLE_COLUMN_FAMILY = SINGLE_COLUMN_FAMILY_NAME.getBytes();
-
public static final long AGG_TIMESTAMP = HConstants.LATEST_TIMESTAMP;
/**
* Key used for a single row aggregation where there is no group by
*/
public final static byte[] UNGROUPED_AGG_ROW_KEY = Bytes.toBytes("a");
- public final static PName AGG_COLUMN_NAME = SINGLE_COLUMN_NAME;
- public final static PName AGG_COLUMN_FAMILY_NAME = SINGLE_COLUMN_FAMILY_NAME;
-
- public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = Bytes.toBytes("a");
- // Use empty byte array for column qualifier so as not to accidentally conflict with any other columns
- public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = ByteUtil.EMPTY_BYTE_ARRAY;
+
+ /** BEGIN Set of reserved column qualifiers **/
+
+ public static final String RESERVED_COLUMN_FAMILY = "_r";
+ public static final byte[] RESERVED_COLUMN_FAMILY_BYTES = Bytes.toBytes(RESERVED_COLUMN_FAMILY);
+
+ public static final byte[] VALUE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES;
+ public static final byte[] VALUE_COLUMN_QUALIFIER = PInteger.INSTANCE.toBytes(1);
+
+ public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES;
+ public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = PInteger.INSTANCE.toBytes(2);
+
+ public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s");
+ public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s");
+ public final static byte[] SINGLE_COLUMN = PInteger.INSTANCE.toBytes(3);
+ public final static byte[] SINGLE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES;
+ /** END Set of reserved column qualifiers **/
+
public static final byte[] TRUE = new byte[] {1};
@@ -186,11 +196,18 @@ public interface QueryConstants {
public static final byte[] EMPTY_COLUMN_BYTES = Bytes.toBytes(EMPTY_COLUMN_NAME);
public static final ImmutableBytesPtr EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr(
EMPTY_COLUMN_BYTES);
+ public static final Integer ENCODED_EMPTY_COLUMN_NAME = 0;
+ public static final byte[] ENCODED_EMPTY_COLUMN_BYTES = PInteger.INSTANCE.toBytes(ENCODED_EMPTY_COLUMN_NAME);
+ public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr(
+ ENCODED_EMPTY_COLUMN_BYTES);
public final static String EMPTY_COLUMN_VALUE = "x";
public final static byte[] EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
public static final ImmutableBytesPtr EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
EMPTY_COLUMN_VALUE_BYTES);
-
+ public static final String ENCODED_EMPTY_COLUMN_VALUE = EMPTY_COLUMN_VALUE;
+ public final static byte[] ENCODED_EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
+ public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
+ ENCODED_EMPTY_COLUMN_VALUE_BYTES);
public static final String DEFAULT_COLUMN_FAMILY = "0";
public static final byte[] DEFAULT_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_COLUMN_FAMILY);
public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
@@ -216,6 +233,13 @@ public interface QueryConstants {
public static final int NANOS_IN_SECOND = BigDecimal.valueOf(Math.pow(10, 9)).intValue();
public static final int DIVERGED_VIEW_BASE_COLUMN_COUNT = -100;
public static final int BASE_TABLE_BASE_COLUMN_COUNT = -1;
+
+ //TODO: samarth think about this more.
+ /**
+ * We mark counter values 0 to 10 as reserved. Value 0 is used by {@link #ENCODED_EMPTY_COLUMN_NAME}. Values 1-10
+ * are reserved for special column qualifiers returned by Phoenix co-processors.
+ */
+ public static final int ENCODED_CQ_COUNTER_INITIAL_VALUE = 11;
public static final String CREATE_TABLE_METADATA =
// Do not use IF NOT EXISTS as we sometimes catch the TableAlreadyExists
// exception and add columns to the SYSTEM.TABLE dynamically.
@@ -282,6 +306,9 @@ public interface QueryConstants {
IS_NAMESPACE_MAPPED + " BOOLEAN," +
AUTO_PARTITION_SEQ + " VARCHAR," +
APPEND_ONLY_SCHEMA + " BOOLEAN," +
+ ENCODED_COLUMN_QUALIFIER + " INTEGER," +
+ STORAGE_SCHEME + " TINYINT, " +
+ COLUMN_QUALIFIER_COUNTER + " INTEGER, " +
"CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ","
+ TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" +
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 892482d..0761b73 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -238,7 +238,7 @@ public class QueryServicesOptions {
// doesn't depend on phoenix-core.
public static final String DEFAULT_QUERY_SERVER_SERIALIZATION = "PROTOBUF";
public static final int DEFAULT_QUERY_SERVER_HTTP_PORT = 8765;
- public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true;
+ public static final boolean DEFAULT_RENEW_LEASE_ENABLED = false;
public static final int DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS =
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2;
public static final int DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS =
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index 76f6218..544fb20 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -22,6 +22,7 @@ import org.apache.phoenix.expression.ColumnExpression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -45,7 +46,7 @@ public class ColumnRef {
}
public ColumnRef(TableRef tableRef, String familyName, String columnName) throws MetaDataEntityNotFoundException {
- this(tableRef, tableRef.getTable().getColumnFamily(familyName).getColumn(columnName).getPosition());
+ this(tableRef, tableRef.getTable().getColumnFamily(familyName).getPColumnForColumnName(columnName).getPosition());
}
public ColumnRef(TableRef tableRef, int columnPosition) {
@@ -109,7 +110,7 @@ public class ColumnRef {
return new ProjectedColumnExpression(column, table, displayName);
}
- return new KeyValueColumnExpression(column, displayName);
+ return new KeyValueColumnExpression(column, displayName, EncodedColumnsUtil.usesEncodedColumnNames(table));
}
public ColumnRef cloneAtTimestamp(long timestamp) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index a60229e..4ac8f46 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -90,4 +90,9 @@ public class DelegateColumn extends DelegateDatum implements PColumn {
public boolean isDynamic() {
return getDelegate().isDynamic();
}
+
+ @Override
+ public Integer getEncodedColumnQualifier() {
+ return getDelegate().getEncodedColumnQualifier();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 3ee012f..c7547c3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -96,8 +96,8 @@ public class DelegateTable implements PTable {
}
@Override
- public PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException {
- return delegate.getColumn(name);
+ public PColumn getPColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException {
+ return delegate.getPColumnForColumnName(name);
}
@Override
@@ -280,4 +280,20 @@ public class DelegateTable implements PTable {
public boolean isAppendOnlySchema() {
return delegate.isAppendOnlySchema();
}
+
+ @Override
+ public StorageScheme getStorageScheme() {
+ return delegate.getStorageScheme();
+ }
+
+ @Override
+ public PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException {
+ return delegate.getPColumnForColumnQualifier(cq);
+ }
+
+ @Override
+ public EncodedCQCounter getEncodedCQCounter() {
+ return delegate.getEncodedCQCounter();
+
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/450dbc59/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java
index 1ab8c86..1bcf808 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/KeyValueSchema.java
@@ -109,6 +109,8 @@ public class KeyValueSchema extends ValueSchema {
Field field = fields.get(i);
PDataType type = field.getDataType();
for (int j = 0; j < field.getCount(); j++) {
+ //TODO: samarth it is at this point that we are looking up stuff in the result tuple to figure out
+ // where exactly the value is here.
if (expressions[index].evaluate(tuple, ptr) && ptr.getLength() > 0) { // Skip null values
if (index >= minNullableIndex) {
valueSet.set(index - minNullableIndex);