You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/03/09 23:02:25 UTC
[12/50] [abbrv] phoenix git commit: PHOENIX-1598 Encode column names
to save space and improve performance
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index fdf5498..a41dbf0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -65,7 +65,6 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
@@ -244,8 +243,16 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
for (ColumnReference ref : mutableColumns) {
scan.addColumn(ref.getFamily(), ref.getQualifier());
}
+ /*
+ * Indexes inherit the storage scheme of the data table which means all the indexes have the same
+ * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start
+ * supporting new indexes over existing data tables to have a different storage scheme than the data
+ * table.
+ */
+ byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier();
+
// Project empty key value column
- scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+ scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
scanRanges.initializeScan(scan);
TableName tableName = env.getRegion().getRegionInfo().getTable();
@@ -296,7 +303,8 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException {
if (scanner != null) {
Result result;
- ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+ ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0)
+ .getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
// Process existing data table rows by removing the old index row and adding the new index row
while ((result = scanner.next()) != null) {
Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
@@ -324,7 +332,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
// to generate point delete markers for all index rows that were added. We don't have Tephra
// manage index rows in change sets because we don't want to be hit with the additional
// memory hit and do not need to do conflict detection on index rows.
- ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+ ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
while ((result = scanner.next()) != null) {
Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
// Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 51587f1..d563bc2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,12 +17,17 @@
*/
package org.apache.phoenix.iterate;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.schema.PTable.IndexType.LOCAL;
+import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter;
+import static org.apache.phoenix.util.ScanUtil.hasDynamicColumns;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
@@ -30,6 +35,7 @@ import java.io.DataInputStream;
import java.io.EOFException;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -69,6 +75,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.filter.ColumnProjectionFilter;
import org.apache.phoenix.filter.DistinctPrefixFilter;
+import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.parse.FilterableStatement;
@@ -76,12 +83,13 @@ import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.TableRef;
@@ -90,6 +98,8 @@ import org.apache.phoenix.schema.stats.GuidePostsKey;
import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PrefixByteCodec;
import org.apache.phoenix.util.PrefixByteDecoder;
@@ -157,7 +167,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return true;
}
- private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) {
+ private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) throws SQLException {
StatementContext context = plan.getContext();
TableRef tableRef = plan.getTableRef();
PTable table = tableRef.getTable();
@@ -208,7 +218,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// Project empty key value unless the column family containing it has
// been projected in its entirety.
if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
- scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+ scan.addColumn(ecf, EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst());
}
}
}
@@ -226,7 +236,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
if(offset!=null){
ScanUtil.addOffsetAttribute(scan, offset);
}
-
int cols = plan.getGroupBy().getOrderPreservingColumnCount();
if (cols > 0 && keyOnlyFilter &&
!plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) &&
@@ -241,13 +250,93 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
ScanUtil.andFilterAtEnd(scan, new PageFilter(plan.getLimit()));
}
}
-
+ scan.setAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME, new byte[]{table.getEncodingScheme().getSerializedMetadataValue()});
+ scan.setAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME, new byte[]{table.getImmutableStorageScheme().getSerializedMetadataValue()});
+ // we use this flag on the server side to determine which value column qualifier to use in the key value we return from server.
+ scan.setAttribute(BaseScannerRegionObserver.USE_NEW_VALUE_COLUMN_QUALIFIER, Bytes.toBytes(true));
+ // When analyzing the table, there is no look up for key values being done.
+ // So there is no point setting the range.
+ if (EncodedColumnsUtil.setQualifierRanges(table) && !ScanUtil.isAnalyzeTable(scan)) {
+ Pair<Integer, Integer> range = getEncodedQualifierRange(scan, context);
+ if (range != null) {
+ scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, Bytes.toBytes(range.getFirst()));
+ scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, Bytes.toBytes(range.getSecond()));
+ ScanUtil.setQualifierRangesOnFilter(scan, range);
+ }
+ }
if (optimizeProjection) {
optimizeProjection(context, scan, table, statement);
}
}
}
+
+ private static Pair<Integer, Integer> getEncodedQualifierRange(Scan scan, StatementContext context)
+ throws SQLException {
+ PTable table = context.getCurrentTable().getTable();
+ QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+ checkArgument(encodingScheme != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS,
+ "Method should only be used for tables using encoded column names");
+ Pair<Integer, Integer> minMaxQualifiers = new Pair<>();
+ for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
+ byte[] cq = whereCol.getSecond();
+ if (cq != null) {
+ int qualifier = table.getEncodingScheme().decode(cq);
+ determineQualifierRange(qualifier, minMaxQualifiers);
+ }
+ }
+ Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+
+ Map<String, Pair<Integer, Integer>> qualifierRanges = EncodedColumnsUtil.getFamilyQualifierRanges(table);
+ for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+ if (entry.getValue() != null) {
+ for (byte[] cq : entry.getValue()) {
+ if (cq != null) {
+ int qualifier = table.getEncodingScheme().decode(cq);
+ determineQualifierRange(qualifier, minMaxQualifiers);
+ }
+ }
+ } else {
+ /*
+ * All the columns of the column family are being projected. So we will need to
+ * consider all the columns in the column family to determine the min-max range.
+ */
+ String family = Bytes.toString(entry.getKey());
+ if (table.getType() == INDEX && table.getIndexType() == LOCAL && !IndexUtil.isLocalIndexFamily(family)) {
+ //TODO: samarth confirm with James why do we need this hack here :(
+ family = IndexUtil.getLocalIndexColumnFamily(family);
+ }
+ Pair<Integer, Integer> range = qualifierRanges.get(family);
+ if (range != null) {
+ determineQualifierRange(range.getFirst(), minMaxQualifiers);
+ determineQualifierRange(range.getSecond(), minMaxQualifiers);
+ }
+ }
+ }
+ if (minMaxQualifiers.getFirst() == null) {
+ return null;
+ }
+ return minMaxQualifiers;
+ }
+ /**
+ *
+ * @param cq
+ * @param minMaxQualifiers
+ * @return true if the empty column was projected
+ */
+ private static void determineQualifierRange(Integer qualifier, Pair<Integer, Integer> minMaxQualifiers) {
+ if (minMaxQualifiers.getFirst() == null) {
+ minMaxQualifiers.setFirst(qualifier);
+ minMaxQualifiers.setSecond(qualifier);
+ } else {
+ if (minMaxQualifiers.getFirst() > qualifier) {
+ minMaxQualifiers.setFirst(qualifier);
+ } else if (minMaxQualifiers.getSecond() < qualifier) {
+ minMaxQualifiers.setSecond(qualifier);
+ }
+ }
+ }
+
private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
// columnsTracker contain cf -> qualifiers which should get returned.
@@ -255,6 +344,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
int referencedCfCount = familyMap.size();
+ QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+ ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
+ BitSet trackedColumnsBitset = isPossibleToUseEncodedCQFilter(encodingScheme, storageScheme) && !hasDynamicColumns(table) ? new BitSet(10) : null;
boolean filteredColumnNotInProjection = false;
for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
byte[] filteredFamily = whereCol.getFirst();
@@ -295,6 +387,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
cols = new TreeSet<ImmutableBytesPtr>();
for (byte[] q : qs) {
cols.add(new ImmutableBytesPtr(q));
+ if (trackedColumnsBitset != null) {
+ int qualifier = encodingScheme.decode(q);
+ trackedColumnsBitset.set(qualifier);
+ }
}
}
columnsTracker.put(cf, cols);
@@ -343,8 +439,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// in the scan in this case. We still want the other optimization that causes
// the ExplicitColumnTracker not to be used, though.
if (!statement.isAggregate() && filteredColumnNotInProjection) {
- ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
- columnsTracker, conditionOnlyCfs));
+ ScanUtil.andFilterAtEnd(scan,
+ trackedColumnsBitset != null ? new EncodedQualifiersColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), trackedColumnsBitset, conditionOnlyCfs, table.getEncodingScheme()) : new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
+ columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table.getEncodingScheme())));
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index 3293f65..1e5f09e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -49,7 +49,7 @@ abstract public class LookAheadResultIterator implements PeekingResultIterator {
};
}
- private final static Tuple UNINITIALIZED = new ResultTuple();
+ private final static Tuple UNINITIALIZED = ResultTuple.EMPTY_TUPLE;
private Tuple next = UNINITIALIZED;
abstract protected Tuple advance() throws SQLException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
index 8ada952..135ab26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
@@ -180,6 +180,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
return this.index;
}
+ @Override
public int size() {
if (flushBuffer)
return flushedCount;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index da75bb7..5afdfea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.SizedUtil;
import com.google.common.base.Function;
@@ -264,7 +265,7 @@ public class OrderedResultIterator implements PeekingResultIterator {
}
this.byteSize = queueEntries.getByteSize();
} catch (IOException e) {
- throw new SQLException("", e);
+ ServerUtil.createIOException(e.getMessage(), e);
} finally {
delegate.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index 52fbe9c..a5a40e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -24,16 +24,27 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.ServerUtil;
public class RegionScannerResultIterator extends BaseResultIterator {
private final RegionScanner scanner;
-
- public RegionScannerResultIterator(RegionScanner scanner) {
+ private final Pair<Integer, Integer> minMaxQualifiers;
+ private final boolean useQualifierAsIndex;
+ private final QualifierEncodingScheme encodingScheme;
+
+ public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, Integer> minMaxQualifiers, QualifierEncodingScheme encodingScheme) {
this.scanner = scanner;
+ this.useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
+ this.minMaxQualifiers = minMaxQualifiers;
+ this.encodingScheme = encodingScheme;
}
@Override
@@ -43,7 +54,7 @@ public class RegionScannerResultIterator extends BaseResultIterator {
synchronized (scanner) {
try {
// TODO: size
- List<Cell> results = new ArrayList<Cell>();
+ List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
// Results are potentially returned even when the return value of s.next is false
// since this is an indication of whether or not there are more values after the
// ones returned
@@ -54,7 +65,7 @@ public class RegionScannerResultIterator extends BaseResultIterator {
}
// We instantiate a new tuple because in all cases currently we hang on to it
// (i.e. to compute and hold onto the TopN).
- MultiKeyValueTuple tuple = new MultiKeyValueTuple();
+ Tuple tuple = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
tuple.setKeyValues(results);
return tuple;
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 5142b57..0e62164 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -205,7 +205,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT);
public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP";
public static final byte[] IS_ROW_TIMESTAMP_BYTES = Bytes.toBytes(IS_ROW_TIMESTAMP);
-
+
public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY;
public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
@@ -322,6 +322,15 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
/** Version below which we fall back on the generic KeyValueBuilder */
public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14");
+ public static final String IMMUTABLE_STORAGE_SCHEME = "IMMUTABLE_STORAGE_SCHEME";
+ public static final byte[] STORAGE_SCHEME_BYTES = Bytes.toBytes(IMMUTABLE_STORAGE_SCHEME);
+ public static final String ENCODING_SCHEME = "ENCODING_SCHEME";
+ public static final byte[] ENCODING_SCHEME_BYTES = Bytes.toBytes(ENCODING_SCHEME);
+ public static final String COLUMN_QUALIFIER = "COLUMN_QUALIFIER";
+ public static final byte[] COLUMN_QUALIFIER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER);
+ public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER";
+ public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER_COUNTER);
+
PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
this.connection = connection;
@@ -595,9 +604,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
newCells.addAll(cells);
newCells.add(kv);
Collections.sort(newCells, KeyValue.COMPARATOR);
- resultTuple.setResult(Result.create(newCells));
+ tuple = new ResultTuple(Result.create(newCells));
}
-
return tuple;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 47c17ae..3ca48a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -107,7 +107,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
private final static String STRING_FALSE = "0";
private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0);
private final static Integer INTEGER_FALSE = Integer.valueOf(0);
- private final static Tuple BEFORE_FIRST = new ResultTuple();
+ private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE;
private final ResultIterator scanner;
private final RowProjector rowProjector;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 3072736..a8ddd62 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -56,7 +56,7 @@ public class HashCacheFactory implements ServerCacheFactory {
}
@Override
- public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException {
+ public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException {
try {
// This reads the uncompressed length from the front of the compressed input
int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), cachePtr.getOffset());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index b12326a..278489d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -208,7 +209,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
not care about it
*/
private void initColumnIndexes() throws SQLException {
- columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR);
+ columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
int columnIndex = 0;
for(int index = 0; index < logicalNames.size(); index++) {
PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
@@ -216,18 +217,22 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
byte[] family = new byte[0];
- if (c.getFamilyName() != null) // Skip PK column
+ byte[] cq;
+ if (!SchemaUtil.isPKColumn(c)) {
family = c.getFamilyName().getBytes();
- byte[] name = c.getName().getBytes();
- byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+ cq = c.getColumnQualifierBytes();
+ } else {
+ cq = c.getName().getBytes();
+ }
+ byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
if (!columnIndexes.containsKey(cfn)) {
columnIndexes.put(cfn, new Integer(columnIndex));
columnIndex++;
}
}
byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
- byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES,
- QueryConstants.EMPTY_COLUMN_BYTES);
+ byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+ byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
columnIndexes.put(cfn, new Integer(columnIndex));
columnIndex++;
}
@@ -243,9 +248,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
private int findIndex(Cell cell) throws IOException {
byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength());
- byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
+ byte[] cq = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
- byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+ byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
if(columnIndexes.containsKey(cfn)) {
return columnIndexes.get(cfn);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 15d6d2f..c529afe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -89,7 +90,7 @@ public class FormatToKeyValueReducer
}
private void initColumnsMap(PhoenixConnection conn) throws SQLException {
- Map<byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
+ Map<byte[], Integer> indexMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
columnIndexes = new HashMap<>();
int columnIndex = 0;
for (int index = 0; index < logicalNames.size(); index++) {
@@ -98,12 +99,16 @@ public class FormatToKeyValueReducer
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
byte[] family = new byte[0];
- if (c.getFamilyName() != null) {
+ byte[] cq;
+ if (!SchemaUtil.isPKColumn(c)) {
family = c.getFamilyName().getBytes();
+ cq = c.getColumnQualifierBytes();
+ } else {
+ // TODO: samarth verify if this is the right thing to do here.
+ cq = c.getName().getBytes();
}
- byte[] name = c.getName().getBytes();
- byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
- Pair<byte[], byte[]> pair = new Pair(family, name);
+ byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
+ Pair<byte[], byte[]> pair = new Pair<>(family, cq);
if (!indexMap.containsKey(cfn)) {
indexMap.put(cfn, new Integer(columnIndex));
columnIndexes.put(new Integer(columnIndex), pair);
@@ -111,8 +116,8 @@ public class FormatToKeyValueReducer
}
}
byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
- Pair<byte[], byte[]> pair = new Pair(emptyColumnFamily, QueryConstants
- .EMPTY_COLUMN_BYTES);
+ byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+ Pair<byte[], byte[]> pair = new Pair<>(emptyColumnFamily, emptyKeyValue);
columnIndexes.put(new Integer(columnIndex), pair);
columnIndex++;
}
@@ -123,18 +128,17 @@ public class FormatToKeyValueReducer
Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
throws IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
- ImmutableBytesWritable rowKey = key.getRowkey();
for (ImmutableBytesWritable aggregatedArray : values) {
DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
while (input.available() != 0) {
byte type = input.readByte();
int index = WritableUtils.readVInt(input);
ImmutableBytesWritable family;
- ImmutableBytesWritable name;
+ ImmutableBytesWritable cq;
ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
Pair<byte[], byte[]> pair = columnIndexes.get(index);
family = new ImmutableBytesWritable(pair.getFirst());
- name = new ImmutableBytesWritable(pair.getSecond());
+ cq = new ImmutableBytesWritable(pair.getSecond());
int len = WritableUtils.readVInt(input);
if (len > 0) {
byte[] array = new byte[len];
@@ -145,10 +149,10 @@ public class FormatToKeyValueReducer
KeyValue.Type kvType = KeyValue.Type.codeToType(type);
switch (kvType) {
case Put: // not null value
- kv = builder.buildPut(key.getRowkey(), family, name, value);
+ kv = builder.buildPut(key.getRowkey(), family, cq, value);
break;
case DeleteColumn: // null value
- kv = builder.buildDeleteColumns(key.getRowkey(), family, name);
+ kv = builder.buildDeleteColumns(key.getRowkey(), family, cq);
break;
default:
throw new IOException("Unsupported KeyValue type " + kvType);
@@ -164,4 +168,4 @@ public class FormatToKeyValueReducer
if (++index % 100 == 0) context.setStatus("Wrote " + index);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
index 47a38a7..54dc748 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
@@ -113,7 +113,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
if (put == null) {
put = new Put(CellUtil.cloneRow(cell));
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ put.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
mutations.add(put);
}
@@ -122,7 +122,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
if (del == null) {
del = new Delete(CellUtil.cloneRow(cell));
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ del.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
mutations.add(del);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 70c0575..2329432 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -23,11 +23,34 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERS
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
+import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
@@ -41,6 +64,7 @@ import java.lang.ref.WeakReference;
import java.sql.PreparedStatement;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -160,11 +184,13 @@ import org.apache.phoenix.schema.ColumnAlreadyExistsException;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.EmptySequenceCacheException;
import org.apache.phoenix.schema.FunctionNotFoundException;
+import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
import org.apache.phoenix.schema.NewerSchemaAlreadyExistsException;
import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PMetaDataImpl;
import org.apache.phoenix.schema.PName;
@@ -178,6 +204,7 @@ import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceKey;
+import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableProperty;
@@ -187,7 +214,9 @@ import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PUnsignedTinyint;
+import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
@@ -588,7 +617,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
latestMetaDataLock.notifyAll();
}
}
-
@Override
public void updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException {
synchronized (latestMetaDataLock) {
@@ -2708,6 +2736,31 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0);
clearCache();
}
+ if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0) {
+ metaConnection = addColumnQualifierColumn(metaConnection, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 3);
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 2,
+ PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME + " "
+ + PTinyint.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 1,
+ PhoenixDatabaseMetaData.ENCODING_SCHEME + " "
+ + PTinyint.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(
+ metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0,
+ PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER + " "
+ + PInteger.INSTANCE.getSqlTypeName());
+ ConnectionQueryServicesImpl.this.removeTable(null,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0);
+ clearCache();
+ }
}
@@ -2844,6 +2897,84 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
+
+ // Special method for adding the column qualifier column for 4.10.
+ private PhoenixConnection addColumnQualifierColumn(PhoenixConnection oldMetaConnection, Long timestamp) throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo());
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp));
+ // Cannot go through DriverManager or you end up in an infinite loop because it'll call init again
+ PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props);
+ PTable sysCatalogPTable = metaConnection.getTable(new PTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+ int numColumns = sysCatalogPTable.getColumns().size();
+ try (PreparedStatement mutateTable = metaConnection.prepareStatement(MetaDataClient.MUTATE_TABLE)) {
+ mutateTable.setString(1, null);
+ mutateTable.setString(2, SYSTEM_CATALOG_SCHEMA);
+ mutateTable.setString(3, SYSTEM_CATALOG_TABLE);
+ mutateTable.setString(4, PTableType.SYSTEM.getSerializedValue());
+ mutateTable.setLong(5, sysCatalogPTable.getSequenceNumber() + 1);
+ mutateTable.setInt(6, numColumns + 1);
+ mutateTable.execute();
+ }
+ List<Mutation> tableMetadata = new ArrayList<>();
+ tableMetadata.addAll(metaConnection.getMutationState().toMutations(metaConnection.getSCN()).next().getSecond());
+ metaConnection.rollback();
+ PColumn column = new PColumnImpl(PNameFactory.newName("COLUMN_QUALIFIER"),
+ PNameFactory.newName(DEFAULT_COLUMN_FAMILY_NAME), PVarbinary.INSTANCE, null, null, true, numColumns,
+ SortOrder.ASC, null, null, false, null, false, false,
+ Bytes.toBytes("COLUMN_QUALIFIER"));
+ String upsertColumnMetadata = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
+ TENANT_ID + "," +
+ TABLE_SCHEM + "," +
+ TABLE_NAME + "," +
+ COLUMN_NAME + "," +
+ COLUMN_FAMILY + "," +
+ DATA_TYPE + "," +
+ NULLABLE + "," +
+ COLUMN_SIZE + "," +
+ DECIMAL_DIGITS + "," +
+ ORDINAL_POSITION + "," +
+ SORT_ORDER + "," +
+ DATA_TABLE_NAME + "," +
+ ARRAY_SIZE + "," +
+ VIEW_CONSTANT + "," +
+ IS_VIEW_REFERENCED + "," +
+ PK_NAME + "," +
+ KEY_SEQ + "," +
+ COLUMN_DEF + "," +
+ IS_ROW_TIMESTAMP +
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ try (PreparedStatement colUpsert = metaConnection.prepareStatement(upsertColumnMetadata)) {
+ colUpsert.setString(1, null);
+ colUpsert.setString(2, SYSTEM_CATALOG_SCHEMA);
+ colUpsert.setString(3, SYSTEM_CATALOG_TABLE);
+ colUpsert.setString(4, "COLUMN_QUALIFIER");
+ colUpsert.setString(5, DEFAULT_COLUMN_FAMILY);
+ colUpsert.setInt(6, column.getDataType().getSqlType());
+ colUpsert.setInt(7, ResultSetMetaData.columnNullable);
+ colUpsert.setNull(8, Types.INTEGER);
+ colUpsert.setNull(9, Types.INTEGER);
+ colUpsert.setInt(10, sysCatalogPTable.getBucketNum() != null ? numColumns : (numColumns + 1));
+ colUpsert.setInt(11, SortOrder.ASC.getSystemValue());
+ colUpsert.setString(12, null);
+ colUpsert.setNull(13, Types.INTEGER);
+ colUpsert.setBytes(14, null);
+ colUpsert.setBoolean(15, false);
+ colUpsert.setString(16, sysCatalogPTable.getPKName() == null ? null : sysCatalogPTable.getPKName().getString());
+ colUpsert.setNull(17, Types.SMALLINT);
+ colUpsert.setNull(18, Types.VARCHAR);
+ colUpsert.setBoolean(19, false);
+ colUpsert.execute();
+ }
+ tableMetadata.addAll(metaConnection.getMutationState().toMutations(metaConnection.getSCN()).next().getSecond());
+ metaConnection.rollback();
+ metaConnection.getQueryServices().addColumn(tableMetadata, sysCatalogPTable, Collections.<String,List<Pair<String,Object>>>emptyMap(), Collections.<String>emptySet(), Lists.newArrayList(column));
+ metaConnection.removeTable(null, SYSTEM_CATALOG_NAME, null, timestamp);
+ ConnectionQueryServicesImpl.this.removeTable(null,
+ SYSTEM_CATALOG_NAME, null,
+ timestamp);
+ clearCache();
+ return metaConnection;
+ }
private void createSnapshot(String snapshotName, String tableName)
throws SQLException {
@@ -4137,4 +4268,4 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public Configuration getConfiguration() {
return config;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 7c78083..47ef954 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -93,7 +93,6 @@ import org.apache.tephra.inmemory.InMemoryTxSystemClient;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-
/**
*
* Implementation of ConnectionQueryServices used in testing where no connection to
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 8e2dc1a..6f105f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -31,6 +31,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
@@ -40,11 +42,13 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POST_KEY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
@@ -117,8 +121,8 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.util.ByteUtil;
/**
@@ -149,23 +153,30 @@ public interface QueryConstants {
public final static byte[] OFFSET_ROW_KEY_BYTES = Bytes.toBytes(OFFSET_ROW_KEY);
public final static ImmutableBytesPtr OFFSET_ROW_KEY_PTR = new ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES);
- public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s");
- public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s");
- public final static byte[] SINGLE_COLUMN = SINGLE_COLUMN_NAME.getBytes();
- public final static byte[] SINGLE_COLUMN_FAMILY = SINGLE_COLUMN_FAMILY_NAME.getBytes();
-
public static final long AGG_TIMESTAMP = HConstants.LATEST_TIMESTAMP;
/**
* Key used for a single row aggregation where there is no group by
*/
public final static byte[] UNGROUPED_AGG_ROW_KEY = Bytes.toBytes("a");
- public final static PName AGG_COLUMN_NAME = SINGLE_COLUMN_NAME;
- public final static PName AGG_COLUMN_FAMILY_NAME = SINGLE_COLUMN_FAMILY_NAME;
-
- public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = Bytes.toBytes("a");
- // Use empty byte array for column qualifier so as not to accidentally conflict with any other columns
- public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = ByteUtil.EMPTY_BYTE_ARRAY;
+
+ /** BEGIN Set of reserved column qualifiers **/
+
+ public static final String RESERVED_COLUMN_FAMILY = "_v";
+ public static final byte[] RESERVED_COLUMN_FAMILY_BYTES = Bytes.toBytes(RESERVED_COLUMN_FAMILY);
+
+ public static final byte[] VALUE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES;
+ public static final byte[] VALUE_COLUMN_QUALIFIER = QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS.encode(1);
+
+ public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES;
+ public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS.encode(2);
+
+ public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s");
+ public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s");
+ public final static byte[] SINGLE_COLUMN = SINGLE_COLUMN_NAME.getBytes();
+ public final static byte[] SINGLE_COLUMN_FAMILY = SINGLE_COLUMN_FAMILY_NAME.getBytes();
+ /** END Set of reserved column qualifiers **/
+
public static final byte[] TRUE = new byte[] {1};
/**
@@ -192,15 +203,25 @@ public interface QueryConstants {
public static final byte[] EMPTY_COLUMN_BYTES = Bytes.toBytes(EMPTY_COLUMN_NAME);
public static final ImmutableBytesPtr EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr(
EMPTY_COLUMN_BYTES);
+ public static final Integer ENCODED_EMPTY_COLUMN_NAME = 0;
+ public static final byte[] ENCODED_EMPTY_COLUMN_BYTES = QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS.encode(ENCODED_EMPTY_COLUMN_NAME);
public final static String EMPTY_COLUMN_VALUE = "x";
public final static byte[] EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
public static final ImmutableBytesPtr EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
EMPTY_COLUMN_VALUE_BYTES);
-
+ public static final String ENCODED_EMPTY_COLUMN_VALUE = EMPTY_COLUMN_VALUE;
+ public final static byte[] ENCODED_EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
+ public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
+ ENCODED_EMPTY_COLUMN_VALUE_BYTES);
public static final String DEFAULT_COLUMN_FAMILY = "0";
public static final byte[] DEFAULT_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_COLUMN_FAMILY);
public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
DEFAULT_COLUMN_FAMILY_BYTES);
+ // column qualifier of the single key value used to store all columns for the COLUMNS_STORED_IN_SINGLE_CELL storage scheme
+ public static final String SINGLE_KEYVALUE_COLUMN_QUALIFIER = "1";
+ public final static byte[] SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES = Bytes.toBytes(SINGLE_KEYVALUE_COLUMN_QUALIFIER);
+ public static final ImmutableBytesPtr SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR = new ImmutableBytesPtr(
+ SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES);
public static final String LOCAL_INDEX_COLUMN_FAMILY_PREFIX = "L#";
public static final byte[] LOCAL_INDEX_COLUMN_FAMILY_PREFIX_BYTES = Bytes.toBytes(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
@@ -222,6 +243,12 @@ public interface QueryConstants {
public static final int NANOS_IN_SECOND = BigDecimal.valueOf(Math.pow(10, 9)).intValue();
public static final int DIVERGED_VIEW_BASE_COLUMN_COUNT = -100;
public static final int BASE_TABLE_BASE_COLUMN_COUNT = -1;
+
+ /**
+ * We mark counter values 0 to 10 as reserved. Value 0 is used by {@link #ENCODED_EMPTY_COLUMN_NAME}. Values 1-10
+ * are reserved for special column qualifiers returned by Phoenix co-processors.
+ */
+ public static final int ENCODED_CQ_COUNTER_INITIAL_VALUE = 11;
public static final String CREATE_TABLE_METADATA =
// Do not use IF NOT EXISTS as we sometimes catch the TableAlreadyExists
// exception and add columns to the SYSTEM.TABLE dynamically.
@@ -289,6 +316,10 @@ public interface QueryConstants {
AUTO_PARTITION_SEQ + " VARCHAR," +
APPEND_ONLY_SCHEMA + " BOOLEAN," +
GUIDE_POSTS_WIDTH + " BIGINT," +
+ COLUMN_QUALIFIER + " VARBINARY," +
+ IMMUTABLE_STORAGE_SCHEME + " TINYINT, " +
+ ENCODING_SCHEME + " TINYINT, " +
+ COLUMN_QUALIFIER_COUNTER + " INTEGER, " +
"CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ","
+ TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" +
HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
@@ -363,5 +394,5 @@ public interface QueryConstants {
public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
public static final String LAST_SCAN = "LAST_SCAN";
-
+ public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 2035de8..8f0b06e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -240,6 +240,10 @@ public interface QueryServices extends SQLCloseable {
public static final String CLIENT_CONNECTION_CACHE_MAX_SIZE = "phoenix.client.connection.cache.max.size";
public static final String CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS =
"phoenix.client.connection.max.duration";
+ public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB = "phoenix.default.column.encoded.bytes.attrib";
+ public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme";
+ public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.multitenant.immutable.storage.scheme";
+
/**
* Get executor service used for parallel scans
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index de0796f..15ea956 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -93,6 +93,8 @@ import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTableRefFactory;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.DateUtil;
@@ -251,7 +253,6 @@ public class QueryServicesOptions {
public static final long DEFAULT_QUERY_SERVER_UGI_CACHE_MAX_SIZE = 1000L;
public static final int DEFAULT_QUERY_SERVER_UGI_CACHE_INITIAL_SIZE = 100;
public static final int DEFAULT_QUERY_SERVER_UGI_CACHE_CONCURRENCY = 10;
-
public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true;
public static final int DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS =
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2;
@@ -267,6 +268,9 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true;
public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE = 100;
public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION = 86400000;
+ public static final int DEFAULT_COLUMN_ENCODED_BYTES = QualifierEncodingScheme.TWO_BYTE_QUALIFIERS.getSerializedMetadataValue();
+ public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS.toString();
+ public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.ONE_CELL_PER_COLUMN.toString();
@SuppressWarnings("serial")
public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index 1d772b4..c73b860 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -17,6 +17,10 @@
*/
package org.apache.phoenix.schema;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.http.annotation.Immutable;
import org.apache.phoenix.compile.ExpressionCompiler;
@@ -25,19 +29,17 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.expression.function.DefaultValueExpression;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Arrays;
-
/**
*
@@ -59,7 +61,7 @@ public class ColumnRef {
}
public ColumnRef(TableRef tableRef, String familyName, String columnName) throws MetaDataEntityNotFoundException {
- this(tableRef, tableRef.getTable().getColumnFamily(familyName).getColumn(columnName).getPosition());
+ this(tableRef, tableRef.getTable().getColumnFamily(familyName).getPColumnForColumnName(columnName).getPosition());
}
public ColumnRef(TableRef tableRef, int columnPosition) {
@@ -123,7 +125,8 @@ public class ColumnRef {
return new ProjectedColumnExpression(column, table, displayName);
}
- Expression expression = new KeyValueColumnExpression(column, displayName);
+ Expression expression = table.getImmutableStorageScheme() == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS ?
+ new SingleCellColumnExpression(column, displayName, table.getEncodingScheme()) : new KeyValueColumnExpression(column, displayName);
if (column.getExpressionStr() != null) {
String url = PhoenixRuntime.JDBC_PROTOCOL
@@ -140,7 +143,6 @@ public class ColumnRef {
return new DefaultValueExpression(Arrays.asList(expression, defaultExpression));
}
}
-
return expression;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java
new file mode 100644
index 0000000..5ae72d1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * Interface to decode column values that are stored in a byte[]
+ */
+public interface ColumnValueDecoder {
+ /**
+ * sets the ptr to the column value at the given index
+ * @return false if the column value is absent (used to support DEFAULT expressions) or else true
+ */
+ boolean decode(ImmutableBytesWritable ptr, int index);
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java
new file mode 100644
index 0000000..5e930bd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+
+
+/**
+ * Interface to encode column values into a serialized byte[] that will be stored in a single cell
+ * The last byte of the serialized byte[] should be the serialized value of the {@link ImmutableStorageScheme}
+ * that was used.
+ */
+public interface ColumnValueEncoder {
+
+ /**
+ * append a column value to the array
+ */
+ void appendValue(byte[] bytes, int offset, int length);
+
+ /**
+ * append a value that is not present to the array (used to support DEFAULT expressions)
+ */
+ void appendAbsentValue();
+
+ /**
+ * @return the encoded byte[] that contains the serialized column values
+ */
+ byte[] encode();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index aca8219..5c9cc2e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -99,4 +99,8 @@ public class DelegateColumn extends DelegateDatum implements PColumn {
public boolean equals(Object o) {
return getDelegate().equals(o);
}
+ @Override
+ public byte[] getColumnQualifierBytes() {
+ return getDelegate().getColumnQualifierBytes();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 7d39dfe..12258e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -96,8 +96,8 @@ public class DelegateTable implements PTable {
}
@Override
- public PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException {
- return delegate.getColumn(name);
+ public PColumn getColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException {
+ return delegate.getColumnForColumnName(name);
}
@Override
@@ -290,4 +290,24 @@ public class DelegateTable implements PTable {
public boolean equals(Object obj) {
return delegate.equals(obj);
}
+
+ @Override
+ public ImmutableStorageScheme getImmutableStorageScheme() {
+ return delegate.getImmutableStorageScheme();
+ }
+
+ @Override
+ public PColumn getColumnForColumnQualifier(byte[] cf, byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException {
+ return delegate.getColumnForColumnQualifier(cf, cq);
+ }
+
+ @Override
+ public EncodedCQCounter getEncodedCQCounter() {
+ return delegate.getEncodedCQCounter();
+ }
+
+ @Override
+ public QualifierEncodingScheme getEncodingScheme() {
+ return delegate.getEncodingScheme();
+ }
}