You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2021/03/10 00:45:33 UTC
[phoenix] branch 5.1 updated: PHOENIX-6402 Allow using local
indexes with uncovered columns in the WHERE clause.
This is an automated email from the ASF dual-hosted git repository.
larsh pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new 0bbdfda PHOENIX-6402 Allow using local indexes with uncovered columns in the WHERE clause.
0bbdfda is described below
commit 0bbdfdae9a7d479de081b9b7511df96fa9cc5829
Author: Lars <la...@apache.org>
AuthorDate: Sat Mar 6 12:51:14 2021 -0800
PHOENIX-6402 Allow using local indexes with uncovered columns in the WHERE clause.
---
.../apache/phoenix/end2end/index/LocalIndexIT.java | 84 ++++++++++++++++++----
.../org/apache/phoenix/compile/WhereCompiler.java | 35 ++++++---
.../coprocessor/BaseScannerRegionObserver.java | 2 +
.../phoenix/iterate/BaseResultIterators.java | 33 ++++-----
.../org/apache/phoenix/iterate/ExplainTable.java | 11 ++-
.../phoenix/iterate/OrderedResultIterator.java | 5 ++
.../phoenix/iterate/RegionScannerFactory.java | 35 +++++++++
.../apache/phoenix/schema/types/PVarbinary.java | 2 +-
8 files changed, 165 insertions(+), 42 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 495a0a5..90d172e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -82,22 +82,77 @@ public class LocalIndexIT extends BaseLocalIndexIT {
@Test
public void testSelectFromIndexWithAdditionalWhereClause() throws Exception {
+ if (isNamespaceMapped) {
+ return;
+ }
String tableName = schemaName + "." + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
Connection conn = getConnection();
conn.setAutoCommit(true);
- if (isNamespaceMapped) {
- conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
- }
- conn.createStatement().execute("CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, v1 FLOAT, v2 FLOAT)");
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, v1 FLOAT, v2 FLOAT, v3 INTEGER)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(1, 2, 3, 4)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(2, 3, 4, 5)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(3, 4, 5, 6)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(4, 5, 6, 7)");
+
conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
- conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(1, 0.01, 1.0)");
- ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 0.1 and v2 < 10.0");
+ testExtraWhere(conn, tableName);
+
+ conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName);
+ conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v3)");
+ testExtraWhere(conn, tableName);
+
+ conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName);
+ conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v2)");
+ testExtraWhere(conn, tableName);
+
+ conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName);
+ conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v2,v3)");
+ testExtraWhere(conn, tableName);
+ }
+
+ private void testExtraWhere(Connection conn, String tableName) throws SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 3 AND v2 < 4");
+ rs.next();
+ assertEquals(1, rs.getInt(1));
+ rs.close();
+
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 3 AND v3 < 5");
+ rs.next();
+ assertEquals(1, rs.getInt(1));
+ rs.close();
+
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 10 AND v2 < 0 AND v3 < 0");
+ rs.next();
+ assertEquals(0, rs.getInt(1));
+ rs.close();
+
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 <= 2 AND v2 > 0 AND v3 < 5");
rs.next();
assertEquals(1, rs.getInt(1));
rs.close();
+
+ rs = conn.createStatement().executeQuery("SELECT pk FROM "+tableName+" WHERE v1 > 3 AND v2 > 0 AND v3 > 6");
+ rs.next();
+ assertEquals(4, rs.getInt(1));
+ rs.close();
+
+ rs = conn.createStatement().executeQuery("SELECT v1 FROM "+tableName+" WHERE v1 > 3 AND v2 > 0 AND v3 > 6");
+ rs.next();
+ assertEquals(5, rs.getInt(1));
+ rs.close();
+
+ rs = conn.createStatement().executeQuery("SELECT pk FROM "+tableName+" WHERE (v1,v2) IN ((1,5),(4,5))");
+ rs.next();
+ assertEquals(3, rs.getInt(1));
+ rs.close();
+
+ rs = conn.createStatement().executeQuery("SELECT v3 FROM "+tableName+" WHERE (v1,v2) IN ((1,5),(4,5))");
+ rs.next();
+ assertEquals(6, rs.getInt(1));
+ rs.close();
}
@Test
@@ -232,13 +287,13 @@ public class LocalIndexIT extends BaseLocalIndexIT {
QueryUtil.getExplainPlan(rs));
rs.close();
- // 4. Longer prefix on the index.
- // Note: This cannot use the local index, see PHOENIX-6300
+ // 4. Longer prefix on the index, use it.
rs = conn.createStatement().executeQuery("EXPLAIN SELECT v2 FROM " + tableName + " WHERE pk1 = 3 AND pk2 = 4 AND v1 = 3 AND v3 = 1");
assertEquals(
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER "
- + physicalTableName + " [3,4]\n"
- + " SERVER FILTER BY (V1 = 3.0 AND V3 = 1)",
+ + physicalTableName + " [1,3,4,3]\n"
+ + " SERVER FILTER BY FIRST KEY ONLY AND \"V3\" = 1\n"
+ + "CLIENT MERGE SORT",
QueryUtil.getExplainPlan(rs));
rs.close();
}
@@ -374,12 +429,13 @@ public class LocalIndexIT extends BaseLocalIndexIT {
QueryUtil.getExplainPlan(rs));
rs.close();
- // 10. Cannot use index even when also filtering on non-indexed column, see PHOENIX-6400
+ // 10. Use index even when also filtering on non-indexed column
rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + tableName + " WHERE v2 = 2 AND v1 = 3");
assertEquals(
- "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
- + indexPhysicalTableName + "\n"
- + " SERVER FILTER BY (V2 = 2.0 AND V1 = 3.0)",
+ "CLIENT PARALLEL 1-WAY RANGE SCAN OVER "
+ + indexPhysicalTableName + " [1,2]\n"
+ + " SERVER FILTER BY FIRST KEY ONLY AND \"V1\" = 3.0\n"
+ + "CLIENT MERGE SORT",
QueryUtil.getExplainPlan(rs));
rs.close();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 4789f0c..9439a3c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -19,6 +19,9 @@ package org.apache.phoenix.compile;
import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.Collections;
@@ -30,11 +33,14 @@ import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.expression.AndExpression;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
@@ -201,16 +207,6 @@ public class WhereCompiler {
return ref;
}
PTable table = ref.getTable();
- // If current table in the context is local index and table in column reference is global that
- // means the column is not present in the local index. Local indexes do not currently support this.
- // Throwing this exception here will cause this plan to be ignored when enumerating possible plans
- // during the optimizing phase.
- if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL
- && (table.getIndexType() == null || table.getIndexType() == IndexType.GLOBAL)) {
- String schemaNameStr = table.getSchemaName()==null?null:table.getSchemaName().getString();
- String tableNameStr = table.getTableName()==null?null:table.getTableName().getString();
- throw new ColumnNotFoundException(schemaNameStr, tableNameStr, null, ref.getColumn().getName().getString());
- }
// Track if we need to compare KeyValue during filter evaluation
// using column family. If the column qualifier is enough, we
// just use that.
@@ -282,6 +278,25 @@ public class WhereCompiler {
if (LiteralExpression.isBooleanFalseOrNull(whereClause)) {
context.setScanRanges(ScanRanges.NOTHING);
+ } else if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL) {
+ if (whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)) {
+ // pass any extra where as scan attribute so it can be evaluated after all
+ // columns from the main CF have been merged in
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(whereClause).ordinal());
+ whereClause.write(output);
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER, stream.toByteArray());
+
+ // this is needed just for ExplainTable, since de-serializing an expression does not restore
+ // its display properties, and that cannot be changed, due to backwards compatibility
+ scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER_STR, Bytes.toBytes(whereClause.toString()));
+ }
} else if (whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)) {
Filter filter = null;
final Counter counter = new Counter();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 8c1ff37..1244276 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -80,6 +80,8 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO
"_IndexRebuildDisableLoggingVerifyType";
public static final String INDEX_REBUILD_DISABLE_LOGGING_BEYOND_MAXLOOKBACK_AGE =
"_IndexRebuildDisableLoggingBeyondMaxLookbackAge";
+ public static final String LOCAL_INDEX_FILTER = "_LocalIndexFilter";
+ public static final String LOCAL_INDEX_FILTER_STR = "_LocalIndexFilterStr";
/*
* Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 8e0171a..ac8cce4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -404,6 +404,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
BitSet trackedColumnsBitset = isPossibleToUseEncodedCQFilter(encodingScheme, storageScheme) && !hasDynamicColumns(table) ? new BitSet(10) : null;
boolean filteredColumnNotInProjection = false;
+
for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
byte[] filteredFamily = whereCol.getFirst();
if (!(familyMap.containsKey(filteredFamily))) {
@@ -443,22 +444,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
preventSeekToColumn = referencedCfCount == 1 && hbaseServerVersion < MIN_SEEK_TO_COLUMN_VERSION;
}
}
- for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
- ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
- NavigableSet<byte[]> qs = entry.getValue();
- NavigableSet<ImmutableBytesPtr> cols = null;
- if (qs != null) {
- cols = new TreeSet<ImmutableBytesPtr>();
- for (byte[] q : qs) {
- cols.add(new ImmutableBytesPtr(q));
- if (trackedColumnsBitset != null) {
- int qualifier = encodingScheme.decode(q);
- trackedColumnsBitset.set(qualifier);
- }
- }
- }
- columnsTracker.put(cf, cols);
- }
// Making sure that where condition CFs are getting scanned at HRS.
for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
byte[] family = whereCol.getFirst();
@@ -491,6 +476,22 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
}
}
+ for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+ ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
+ NavigableSet<byte[]> qs = entry.getValue();
+ NavigableSet<ImmutableBytesPtr> cols = null;
+ if (qs != null) {
+ cols = new TreeSet<ImmutableBytesPtr>();
+ for (byte[] q : qs) {
+ cols.add(new ImmutableBytesPtr(q));
+ if (trackedColumnsBitset != null) {
+ int qualifier = encodingScheme.decode(q);
+ trackedColumnsBitset.set(qualifier);
+ }
+ }
+ }
+ columnsTracker.put(cf, cols);
+ }
if (!columnsTracker.isEmpty()) {
if (preventSeekToColumn) {
for (ImmutableBytesPtr f : columnsTracker.keySet()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 31713d9..cf5e021 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -178,10 +178,19 @@ public abstract class ExplainTable {
}
} while (filterIterator.hasNext());
}
+ String whereFilterStr = null;
if (whereFilter != null) {
+ whereFilterStr = whereFilter.toString();
+ } else {
+ byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER_STR);
+ if (expBytes != null) {
+ whereFilterStr = Bytes.toString(expBytes);
+ }
+ }
+ if (whereFilterStr != null) {
String serverWhereFilter = "SERVER FILTER BY "
+ (firstKeyOnlyFilter == null ? "" : "FIRST KEY ONLY AND ")
- + whereFilter.toString();
+ + whereFilterStr;
planSteps.add(" " + serverWhereFilter);
if (explainPlanAttributesBuilder != null) {
explainPlanAttributesBuilder.setServerWhereFilter(serverWhereFilter);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 670ced7..edc7cc0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -281,6 +281,11 @@ public class OrderedResultIterator implements PeekingResultIterator {
final SizeAwareQueue<ResultEntry> queueEntries = ((RecordPeekingResultIterator)resultIterator).getQueueEntries();
long startTime = EnvironmentEdgeManager.currentTimeMillis();
for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
+ // result might be empty if it was filtered by a local index
+ if (result.size() == 0) {
+ continue;
+ }
+
if (isDummy(result)) {
dummyTuple = result;
return resultIterator;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 83e2290..20f68cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -41,9 +41,12 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.generated.DynamicColumnMetaDataProtos;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.index.IndexMaintainer;
@@ -63,6 +66,8 @@ import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -124,6 +129,25 @@ public abstract class RegionScannerFactory {
private byte[] actualStartKey = getActualStartKey();
private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
final long pageSizeMs = getPageSizeMsForRegionScanner(scan);
+ Expression extraWhere = null;
+
+ {
+ // for local indexes construct the row filter for uncovered columns if it exists
+ if (ScanUtil.isLocalIndex(scan)) {
+ byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER);
+ if (expBytes != null) {
+ try {
+ ByteArrayInputStream stream = new ByteArrayInputStream(expBytes);
+ DataInputStream input = new DataInputStream(stream);
+ extraWhere = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+ extraWhere.readFields(input);
+ } catch (IOException io) {
+ // should not happen since we're reading from a byte[]
+ throw new RuntimeException(io);
+ }
+ }
+ }
+ }
// Get the actual scan start row of local index. This will be used to compare the row
// key of the results less than scan start row when there are references.
@@ -205,6 +229,17 @@ public abstract class RegionScannerFactory {
*/
IndexUtil.wrapResultUsingOffset(env, result, offset, dataColumns,
tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+
+ if (extraWhere != null) {
+ Tuple merged = useQualifierAsListIndex ? new PositionBasedResultTuple(result) :
+ new ResultTuple(Result.create(result));
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ extraWhere.evaluate(merged, ptr);
+ if (!Boolean.TRUE.equals(extraWhere.getDataType().toObject(ptr))) {
+ result.clear();
+ return next;
+ }
+ }
}
if (projector != null) {
Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) :
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
index e165a9c..1af460a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
@@ -142,7 +142,7 @@ public class PVarbinary extends PBinaryBase {
StringBuilder buf = new StringBuilder();
buf.append('[');
if (length > 0) {
- for (int i = o; i < length; i++) {
+ for (int i = o; i < o + length; i++) {
buf.append(0xFF & b[i]);
buf.append(',');
}