You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/05/02 22:16:12 UTC
phoenix git commit: V1 of the custom list implementation for taking
advantage of encoded qualifiers
Repository: phoenix
Updated Branches:
refs/heads/encodecolumns ba8d6abbc -> 6461e9594
V1 of the custom list implementation for taking advantage of encoded qualifiers
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6461e959
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6461e959
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6461e959
Branch: refs/heads/encodecolumns
Commit: 6461e959404ef28a171a4384dfa44ed973ac124a
Parents: ba8d6ab
Author: Samarth <sa...@salesforce.com>
Authored: Mon May 2 10:04:12 2016 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Mon May 2 10:04:12 2016 -0700
----------------------------------------------------------------------
.../phoenix/end2end/RowValueConstructorIT.java | 2 +-
.../phoenix/compile/ProjectionCompiler.java | 3 +-
.../coprocessor/BaseScannerRegionObserver.java | 2 +
.../GroupedAggregateRegionObserver.java | 23 +-
.../coprocessor/MetaDataEndpointImpl.java | 5 +-
.../UngroupedAggregateRegionObserver.java | 10 +-
.../filter/MultiKeyValueComparisonFilter.java | 2 +-
.../phoenix/iterate/BaseResultIterators.java | 58 +++-
.../apache/phoenix/schema/tuple/BaseTuple.java | 9 +
.../schema/tuple/BoundedSkipNullCellsList.java | 340 +++++++++++++++++++
.../phoenix/schema/tuple/DelegateTuple.java | 7 +
.../schema/tuple/MultiKeyValueTuple.java | 1 +
.../tuple/PositionBasedMultiKeyValueTuple.java | 89 +++++
.../org/apache/phoenix/schema/tuple/Tuple.java | 4 +
.../java/org/apache/phoenix/util/ScanUtil.java | 18 +
15 files changed, 553 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorIT.java
index ea91f4f..be8ec59 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorIT.java
@@ -90,7 +90,7 @@ public class RowValueConstructorIT extends BaseClientManagedTimeIT {
long ts = nextTimestamp();
String tenantId = getOrganizationId();
initATableValues(tenantId, getDefaultSplits(tenantId), null, ts);
- String query = "SELECT a_integer, x_integer FROM aTable WHERE ?=organization_id AND (a_integer, x_integer) >= (4, 4)";
+ String query = "SELECT a_integer, x_integer FROM aTable WHERE ?=organization_id AND a_integer >= 4 and x_integer >= 4";
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
Connection conn = DriverManager.getConnection(getUrl(), props);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index 2f0c377..5fa4c6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -624,8 +624,7 @@ public class ProjectionCompiler {
*/
private void compile() throws SQLException {
final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount());
-
- Iterator<Expression> expressions = context.getExpressionManager().getExpressions();
+ Iterator<Expression> expressions = context.getExpressionManager().getExpressions();
while (expressions.hasNext()) {
Expression expression = expressions.next();
expression.accept(new SingleAggregateFunctionVisitor() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 9814a7d..18a2057 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -100,6 +100,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS";
public final static String SCAN_OFFSET = "_RowOffset";
+ public final static String MIN_QUALIFIER = "_MinQualifier";
+ public final static String MAX_QUALIFIER = "_MaxQualifier";
/**
* Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index d03d0e9..52a25d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -24,6 +24,7 @@ import static org.apache.phoenix.query.QueryServices.GROUPBY_ESTIMATED_DISTINCT_
import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE;
+import static org.apache.phoenix.util.ScanUtil.getMinMaxQualifiersFromScan;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -39,7 +40,6 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
@@ -64,7 +65,10 @@ import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.BoundedSkipNullCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.IndexUtil;
@@ -381,7 +385,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
estDistVals = Math.max(MIN_DISTINCT_VALUES,
(int) (Bytes.toInt(estDistValsBytes) * 1.5f));
}
-
+ Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan);
+ boolean useEncodedScheme = minMaxQualifiers != null;
final boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
@@ -392,18 +397,16 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
boolean success = false;
try {
boolean hasMore;
-
- MultiKeyValueTuple result = new MultiKeyValueTuple();
+ Tuple result = useEncodedScheme ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan)));
}
-
HRegion region = c.getEnvironment().getRegion();
region.startRegionOperation();
try {
synchronized (scanner) {
do {
- List<Cell> results = new ArrayList<Cell>();
+ List<Cell> results = useEncodedScheme ? new BoundedSkipNullCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there are
@@ -438,7 +441,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
}
}
}
-
+
/**
* Used for an aggregate query in which the key order match the group by key order. In this
* case, we can do the aggregation as we scan, by detecting when the group by key changes.
@@ -453,6 +456,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by "
+ expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
}
+ final Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan);
+ final boolean useEncodedScheme = minMaxQualifiers != null;
return new BaseRegionScanner(scanner) {
private long rowCount = 0;
private ImmutableBytesWritable currentKey = null;
@@ -462,7 +467,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
boolean hasMore;
boolean atLimit;
boolean aggBoundary = false;
- MultiKeyValueTuple result = new MultiKeyValueTuple();
+ Tuple result = useEncodedScheme ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
ImmutableBytesWritable key = null;
Aggregator[] rowAggregators = aggregators.getAggregators();
// If we're calculating no aggregate functions, we can exit at the
@@ -473,7 +478,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
try {
synchronized (scanner) {
do {
- List<Cell> kvs = new ArrayList<Cell>();
+ List<Cell> kvs = useEncodedScheme ? new BoundedSkipNullCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index eb557ea..58a637a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -918,9 +918,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
: Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isNamespaceMappedKv.getValueArray(),
isNamespaceMappedKv.getValueOffset(), isNamespaceMappedKv.getValueLength()));
Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX];
- byte v = (byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(), storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength());
//TODO: change this once we start having other values for storage schemes
- StorageScheme storageScheme = storageSchemeKv == null ? StorageScheme.NON_ENCODED_COLUMN_NAMES : StorageScheme.fromSerializedValue(v);
+ StorageScheme storageScheme = storageSchemeKv == null ? StorageScheme.NON_ENCODED_COLUMN_NAMES : StorageScheme
+ .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(),
+ storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength()));
List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
List<PTable> indexes = new ArrayList<PTable>();
List<PName> physicalTables = new ArrayList<PName>();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 950817f..7c3bd28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -24,6 +24,7 @@ import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT;
import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT;
+import static org.apache.phoenix.util.ScanUtil.getMinMaxQualifiersFromScan;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -90,7 +91,10 @@ import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
import org.apache.phoenix.schema.stats.StatisticsCollector;
import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
+import org.apache.phoenix.schema.tuple.BoundedSkipNullCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDataType;
@@ -284,7 +288,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
Aggregator[] rowAggregators = aggregators.getAggregators();
boolean hasMore;
boolean hasAny = false;
- MultiKeyValueTuple result = new MultiKeyValueTuple();
+ Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan);
+ boolean useEncodedScheme = minMaxQualifiers != null;
+ Tuple result = useEncodedScheme ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan)));
}
@@ -294,7 +300,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
try {
synchronized (innerScanner) {
do {
- List<Cell> results = new ArrayList<Cell>();
+ List<Cell> results = useEncodedScheme ? new BoundedSkipNullCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>();
// Results are potentially returned even when the return value of s.next is false
// since this is an indication of whether or not there are more values after the
// ones returned
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
index 8d03797..e8536b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
@@ -94,7 +94,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
refCount = foundColumns.size();
}
- public ReturnCode resolveColumn(Cell value) {
+ private ReturnCode resolveColumn(Cell value) {
// Always set key, in case we never find a key value column of interest,
// and our expression uses row key columns.
setKey(value);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index b749766..0dca8ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.iterate;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
@@ -70,7 +71,6 @@ import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.MetaDataClient;
@@ -82,6 +82,7 @@ import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -224,13 +225,64 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
if(offset!=null){
ScanUtil.addOffsetAttribute(scan, offset);
}
-
+ if (EncodedColumnsUtil.usesEncodedColumnNames(table)) {
+ Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiers(scan, context);
+ if (minMaxQualifiers != null) {
+ scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, PInteger.INSTANCE.toBytes(minMaxQualifiers.getFirst()));
+ scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, PInteger.INSTANCE.toBytes(minMaxQualifiers.getSecond()));
+ }
+ }
if (optimizeProjection) {
optimizeProjection(context, scan, table, statement);
}
}
}
-
+
+ private static Pair<Integer, Integer> getMinMaxQualifiers(Scan scan, StatementContext context) {
+ PTable table = context.getCurrentTable().getTable();
+ checkArgument(EncodedColumnsUtil.usesEncodedColumnNames(table), "Method should only be used for tables using encoded column names");
+ Integer minQualifier = null;
+ Integer maxQualifier = null;
+ for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
+ byte[] cq = whereCol.getSecond();
+ if (cq != null) {
+ int qualifier = (Integer)PInteger.INSTANCE.toObject(cq);
+ if (minQualifier == null && maxQualifier == null) {
+ minQualifier = maxQualifier = qualifier;
+ } else {
+ if (qualifier < minQualifier) {
+ minQualifier = qualifier;
+ } else if (qualifier > maxQualifier) {
+ maxQualifier = qualifier;
+ }
+ }
+ }
+ }
+ Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+ for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+ if (entry.getValue() != null) {
+ for (byte[] cq : entry.getValue()) {
+ if (cq != null) {
+ int qualifier = (Integer)PInteger.INSTANCE.toObject(cq);
+ if (minQualifier == null && maxQualifier == null) {
+ minQualifier = maxQualifier = qualifier;
+ } else {
+ if (qualifier < minQualifier) {
+ minQualifier = qualifier;
+ } else if (qualifier > maxQualifier) {
+ maxQualifier = qualifier;
+ }
+ }
+ }
+ }
+ }
+ }
+ if (minQualifier == null) {
+ return null;
+ }
+ return new Pair<>(minQualifier, maxQualifier);
+ }
+
private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
// columnsTracker contain cf -> qualifiers which should get returned.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
index a8dc487..92371e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
@@ -17,6 +17,10 @@
*/
package org.apache.phoenix.schema.tuple;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+
public abstract class BaseTuple implements Tuple {
@@ -24,4 +28,9 @@ public abstract class BaseTuple implements Tuple {
public long getSequenceValue(int index) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void setKeyValues(List<Cell> values) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BoundedSkipNullCellsList.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BoundedSkipNullCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BoundedSkipNullCellsList.java
new file mode 100644
index 0000000..a04adf7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BoundedSkipNullCellsList.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.types.PInteger;
+
+import com.google.common.base.Preconditions;
+
+public class BoundedSkipNullCellsList implements List<Cell> {
+
+ private final int minQualifier;
+ private final int maxQualifier;
+ private final Cell[] array;
+ private int numNonNullElements;
+ private int firstNonNullElementIdx = -1;
+
+ public BoundedSkipNullCellsList(int minQualifier, int maxQualifier) {
+ Preconditions.checkArgument(minQualifier <= maxQualifier);
+ this.minQualifier = minQualifier;
+ this.maxQualifier = maxQualifier;
+ this.array = new Cell[maxQualifier - minQualifier + 1];
+ }
+
+ @Override
+ public int size() {
+ return numNonNullElements;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return numNonNullElements == 0;
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ throwUnsupportedOperationException();
+ return false;
+ }
+
+
+ @Override
+ public Object[] toArray() {
+ throwUnsupportedOperationException();
+ return null;
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ throwUnsupportedOperationException();
+ return null;
+ }
+
+ @Override
+ public boolean add(Cell e) {
+ if (e == null) {
+ throw new NullPointerException();
+ }
+ int columnQualifier = (int)PInteger.INSTANCE.toObject(e.getQualifierArray(), e.getQualifierOffset(), e.getQualifierLength());
+ checkQualifierRange(columnQualifier);
+ int idx = getArrayIndex(columnQualifier);
+ array[idx] = e;
+ numNonNullElements++;
+ if (firstNonNullElementIdx == -1) {
+ firstNonNullElementIdx = idx;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ if (o == null) {
+ throw new NullPointerException();
+ }
+ Cell e = (Cell)o;
+ int i = 0;
+ while (i < array.length) {
+ if (array[i] != null && array[i].equals(e)) {
+ array[i] = null;
+ numNonNullElements--;
+ if (numNonNullElements == 0) {
+ firstNonNullElementIdx = -1;
+ } else if (firstNonNullElementIdx == i) {
+ // the element being removed was the first non-null element we knew
+ while (i < array.length && (array[i]) == null) {
+ i++;
+ }
+ if (i < array.length) {
+ firstNonNullElementIdx = maxQualifier;
+ } else {
+ firstNonNullElementIdx = -1;
+ }
+ }
+ return true;
+ }
+ i++;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ throwUnsupportedOperationException();
+ return false;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends Cell> c) {
+ boolean changed = false;
+ for (Cell cell : c) {
+ if (c == null) {
+ throw new NullPointerException();
+ }
+ changed |= add(cell);
+ }
+ return changed;
+ }
+
+ @Override
+ public boolean addAll(int index, Collection<? extends Cell> c) {
+ throwUnsupportedOperationException();
+ return false;
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throwUnsupportedOperationException();
+ return false;
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throwUnsupportedOperationException();
+ return false;
+ }
+
+ @Override
+ public void clear() {
+ for (int i = 0; i < array.length; i++) {
+ array[i] = null;
+ }
+ numNonNullElements = 0;
+ }
+
+ @Override
+ public Cell get(int index) {
+ //TODO: samarth how can we support this? It is always assumed that the
+ // user expects to get something back from the list and we would end up returning null
+ // here. Do we just add the
+ throwUnsupportedOperationException();
+ rangeCheck(index);
+ return array[index];
+ }
+
+ public Cell getCellForColumnQualifier(int columnQualifier) {
+ int idx = getArrayIndex(columnQualifier);
+ return array[idx];
+ }
+
+ @Override
+ public Cell set(int index, Cell element) {
+ //TODO: samarth how can we support this?
+ throwUnsupportedOperationException();
+ if (element == null) {
+ throw new NullPointerException();
+ }
+ rangeCheck(index);
+ int idx = minQualifier + index;
+ Cell prev = array[idx];
+ array[idx] = element;
+ if (prev == null) {
+ numNonNullElements++;
+ }
+ return prev;
+ }
+
+ @Override
+ public void add(int index, Cell element) {
+ throwUnsupportedOperationException();
+ }
+
+ @Override
+ public Cell remove(int index) {
+ throwUnsupportedOperationException();
+ return null;
+ }
+
+ @Override
+ public int indexOf(Object o) {
+ throwUnsupportedOperationException();
+ return 0;
+ }
+
+ @Override
+ public int lastIndexOf(Object o) {
+ throwUnsupportedOperationException();
+ return 0;
+ }
+
+ @Override
+ public ListIterator<Cell> listIterator() {
+ throwUnsupportedOperationException();
+ return null;
+ }
+
+ @Override
+ public ListIterator<Cell> listIterator(int index) {
+ throwUnsupportedOperationException();
+ return null;
+ }
+
+ @Override
+ public List<Cell> subList(int fromIndex, int toIndex) {
+ throwUnsupportedOperationException();
+ return null;
+ }
+
+ private void checkQualifierRange(int qualifier) {
+ if (qualifier < minQualifier || qualifier > maxQualifier) {
+ throw new IndexOutOfBoundsException("Qualifier is out of the range. Min: " + minQualifier + " Max: " + maxQualifier);
+ }
+ }
+
+ private void rangeCheck(int index) {
+ if (index < 0 || index >= array.length) {
+ throw new IndexOutOfBoundsException();
+ }
+ }
+
+ private void throwUnsupportedOperationException() {
+ throw new UnsupportedOperationException("Operation not supported because Samarth didn't implement it");
+ }
+
+ @Override
+ public Iterator<Cell> iterator() {
+ return new Itr();
+ }
+
+ private class Itr implements Iterator<Cell> {
+ private Cell current;
+ private int currentIdx = 0;
+ private boolean exhausted = false;
+ private Itr() {
+ moveToNextNonNullCell(true);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !exhausted;
+ }
+
+ @Override
+ public Cell next() {
+ if (exhausted) {
+ return null;
+ }
+ Cell next = current;
+ moveToNextNonNullCell(false);
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throwUnsupportedOperationException();
+ }
+
+ private void moveToNextNonNullCell(boolean init) {
+ int i = init ? 0 : currentIdx + 1;
+ while (i < array.length && (current = array[i]) == null) {
+ i++;
+ }
+ if (i < array.length) {
+ currentIdx = i;
+ } else {
+ currentIdx = -1;
+ exhausted = true;
+ }
+ }
+
+ }
+
+ public Cell getFirstCell() {
+ if (firstNonNullElementIdx == -1) {
+ throw new IllegalStateException("List doesn't have any non-null cell present");
+ }
+ return array[firstNonNullElementIdx];
+ }
+
+ private int getArrayIndex(int columnQualifier) {
+ return columnQualifier - minQualifier;
+ }
+
+// private Cell setCell(int columnQualifier, Cell e) {
+//
+// }
+
+ public static void main (String args[]) throws Exception {
+ BoundedSkipNullCellsList list = new BoundedSkipNullCellsList(0, 10); // list of eleven elements
+ System.out.println(list.size());
+ byte[] row = Bytes.toBytes("row");
+ byte[] cf = Bytes.toBytes("cf");
+ list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(0)));
+ list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(5)));
+ list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(10)));
+
+ for (Cell c : list) {
+ System.out.println(c);
+ }
+ System.out.println(list.size());
+ System.out.println(list.get(0));
+ System.out.println(list.get(5));
+ System.out.println(list.get(10));
+ System.out.println(list.get(1));
+ System.out.println(list.remove(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(5))));
+ System.out.println(list.get(5));
+ System.out.println(list.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
index 58b1eda..3430f5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.schema.tuple;
+import java.util.List;
+
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -61,4 +63,9 @@ public class DelegateTuple implements Tuple {
public long getSequenceValue(int index) {
return delegate.getSequenceValue(index);
}
+
+ @Override
+ public void setKeyValues(List<Cell> values) {
+ delegate.setKeyValues(values);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
index 53f155b..d946870 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java
@@ -36,6 +36,7 @@ public class MultiKeyValueTuple extends BaseTuple {
}
/** Caller must not modify the list that is passed here */
+ @Override
public void setKeyValues(List<Cell> values) {
this.values = values;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
new file mode 100644
index 0000000..a1fe549
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.tuple;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.schema.types.PInteger;
+
+/**
+ * Tuple that uses the
+ */
+public class PositionBasedMultiKeyValueTuple extends BaseTuple {
+ private BoundedSkipNullCellsList values;
+
+ public PositionBasedMultiKeyValueTuple() {}
+
+// public PositionBasedMultiKeyValueTuple(List<Cell> values, int minQualifier, int maxQualifier) {
+// this.values = new BoundedSkipNullCellsList(minQualifier, maxQualifier);
+// setKeyValues(values);
+// }
+
+// public PositionBasedMultiKeyValueTuple(int minQualifier, int maxQualifier){
+// this.values = new BoundedSkipNullCellsList(minQualifier, maxQualifier);
+// }
+
+ /** Caller must not modify the list that is passed here */
+ @Override
+ public void setKeyValues(List<Cell> values) {
+ assert values instanceof BoundedSkipNullCellsList;
+ this.values = (BoundedSkipNullCellsList)values;
+ }
+
+ @Override
+ public void getKey(ImmutableBytesWritable ptr) {
+ Cell value = values.getFirstCell();
+ ptr.set(value.getRowArray(), value.getRowOffset(), value.getRowLength());
+ }
+
+ @Override
+ public boolean isImmutable() {
+ return true;
+ }
+
+ @Override
+ public Cell getValue(byte[] family, byte[] qualifier) {
+ return values.getCellForColumnQualifier((int)PInteger.INSTANCE.toObject(qualifier));
+ }
+
+ @Override
+ public String toString() {
+ return values.toString();
+ }
+
+ @Override
+ public int size() {
+ return values.size();
+ }
+
+ @Override
+ public Cell getValue(int index) {
+ return values.get(index);
+ }
+
+ @Override
+ public boolean getValue(byte[] family, byte[] qualifier,
+ ImmutableBytesWritable ptr) {
+ Cell kv = getValue(family, qualifier);
+ if (kv == null)
+ return false;
+ ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+ return true;
+ }}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
index 61b2a4f..e4a887b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/Tuple.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.schema.tuple;
+import java.util.List;
+
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -87,4 +89,6 @@ public interface Tuple {
* @return the current or next sequence value
*/
public long getSequenceValue(int index);
+
+ public void setKeyValues(List<Cell> values);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6461e959/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 46589b9..8174f7b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableComparator;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.ScanRanges;
@@ -62,6 +63,7 @@ import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PVarbinary;
import com.google.common.collect.Iterators;
@@ -807,5 +809,21 @@ public class ScanUtil {
public static void addOffsetAttribute(Scan scan, Integer offset) {
scan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, Bytes.toBytes(offset));
}
+
+ public static Pair<Integer, Integer> getMinMaxQualifiersFromScan(Scan scan) {
+ Integer minQ = null, maxQ = null;
+ byte[] minQualifier = scan.getAttribute(BaseScannerRegionObserver.MIN_QUALIFIER);
+ if (minQualifier != null) {
+ minQ = (Integer)PInteger.INSTANCE.toObject(minQualifier);
+ }
+ byte[] maxQualifier = scan.getAttribute(BaseScannerRegionObserver.MAX_QUALIFIER);
+ if (maxQualifier != null) {
+ maxQ = (Integer)PInteger.INSTANCE.toObject(maxQualifier);
+ }
+ if (minQualifier == null) {
+ return null;
+ }
+ return new Pair<>(minQ, maxQ);
+ }
}
\ No newline at end of file