You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2021/03/10 00:48:22 UTC

[phoenix] branch 4.x updated: PHOENIX-6402 Allow using local indexes with uncovered columns in the WHERE clause.

This is an automated email from the ASF dual-hosted git repository.

larsh pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 78a5ab0  PHOENIX-6402 Allow using local indexes with uncovered columns in the WHERE clause.
78a5ab0 is described below

commit 78a5ab06bb382a04974ca93c7a33aff0867c7132
Author: Lars <la...@apache.org>
AuthorDate: Sat Mar 6 12:51:14 2021 -0800

    PHOENIX-6402 Allow using local indexes with uncovered columns in the WHERE clause.
---
 .../apache/phoenix/end2end/index/LocalIndexIT.java | 84 ++++++++++++++++++----
 .../org/apache/phoenix/compile/WhereCompiler.java  | 35 ++++++---
 .../coprocessor/BaseScannerRegionObserver.java     |  2 +
 .../phoenix/iterate/BaseResultIterators.java       | 33 ++++-----
 .../org/apache/phoenix/iterate/ExplainTable.java   | 11 ++-
 .../phoenix/iterate/OrderedResultIterator.java     |  5 ++
 .../phoenix/iterate/RegionScannerFactory.java      | 35 +++++++++
 .../apache/phoenix/schema/types/PVarbinary.java    |  2 +-
 8 files changed, 165 insertions(+), 42 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 276395e..a3f3ed1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -81,22 +81,77 @@ public class LocalIndexIT extends BaseLocalIndexIT {
 
     @Test
     public void testSelectFromIndexWithAdditionalWhereClause() throws Exception {
+        if (isNamespaceMapped) {
+            return;
+        }
         String tableName = schemaName + "." + generateUniqueName();
         String indexName = "IDX_" + generateUniqueName();
 
         Connection conn = getConnection();
         conn.setAutoCommit(true);
-        if (isNamespaceMapped) {
-            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
-        }
 
-        conn.createStatement().execute("CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, v1 FLOAT, v2 FLOAT)");
+        conn.createStatement().execute("CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, v1 FLOAT, v2 FLOAT, v3 INTEGER)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(1, 2, 3, 4)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(2, 3, 4, 5)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(3, 4, 5, 6)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(4, 5, 6, 7)");
+
         conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(1, 0.01, 1.0)");
-        ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 0.1 and v2 < 10.0");
+        testExtraWhere(conn, tableName);
+
+        conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName);
+        conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v3)");
+        testExtraWhere(conn, tableName);
+
+        conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName);
+        conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v2)");
+        testExtraWhere(conn, tableName);
+
+        conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName);
+        conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v2,v3)");
+        testExtraWhere(conn, tableName);
+    }
+
+    private void testExtraWhere(Connection conn, String tableName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 3 AND v2 < 4");
+        rs.next();
+        assertEquals(1, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 3 AND v3 < 5");
+        rs.next();
+        assertEquals(1, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 10 AND v2 < 0 AND v3 < 0");
+        rs.next();
+        assertEquals(0, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 <= 2 AND v2 > 0 AND v3 < 5");
         rs.next();
         assertEquals(1, rs.getInt(1));
         rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT pk FROM "+tableName+" WHERE v1 > 3 AND v2 > 0 AND v3 > 6");
+        rs.next();
+        assertEquals(4, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT v1 FROM "+tableName+" WHERE v1 > 3 AND v2 > 0 AND v3 > 6");
+        rs.next();
+        assertEquals(5, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT pk FROM "+tableName+" WHERE (v1,v2) IN ((1,5),(4,5))");
+        rs.next();
+        assertEquals(3, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT v3 FROM "+tableName+" WHERE (v1,v2) IN ((1,5),(4,5))");
+        rs.next();
+        assertEquals(6, rs.getInt(1));
+        rs.close();
     }
 
     @Test
@@ -231,13 +286,13 @@ public class LocalIndexIT extends BaseLocalIndexIT {
                     QueryUtil.getExplainPlan(rs));
         rs.close();
 
-        // 4. Longer prefix on the index.
-        // Note: This cannot use the local index, see PHOENIX-6300
+        // 4. Longer prefix on the index, use it.
         rs = conn.createStatement().executeQuery("EXPLAIN SELECT v2 FROM " + tableName + " WHERE pk1 = 3 AND pk2 = 4 AND v1 = 3 AND v3 = 1");
         assertEquals(
             "CLIENT PARALLEL 1-WAY RANGE SCAN OVER "
-                    + physicalTableName + " [3,4]\n"
-                    + "    SERVER FILTER BY (V1 = 3.0 AND V3 = 1)",
+                    + physicalTableName + " [1,3,4,3]\n"
+                    + "    SERVER FILTER BY FIRST KEY ONLY AND \"V3\" = 1\n"
+                    + "CLIENT MERGE SORT",
                     QueryUtil.getExplainPlan(rs));
         rs.close();
     }
@@ -373,12 +428,13 @@ public class LocalIndexIT extends BaseLocalIndexIT {
                     QueryUtil.getExplainPlan(rs));
         rs.close();
 
-        // 10. Cannot use index even when also filtering on non-indexed column, see PHOENIX-6400
+        // 10. Use index even when also filtering on non-indexed column
         rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + tableName + " WHERE v2 = 2 AND v1 = 3");
         assertEquals(
-            "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
-                    + indexPhysicalTableName + "\n"
-                            + "    SERVER FILTER BY (V2 = 2.0 AND V1 = 3.0)",
+            "CLIENT PARALLEL 1-WAY RANGE SCAN OVER "
+                    + indexPhysicalTableName + " [1,2]\n"
+                    + "    SERVER FILTER BY FIRST KEY ONLY AND \"V1\" = 3.0\n"
+                    + "CLIENT MERGE SORT",
                     QueryUtil.getExplainPlan(rs));
         rs.close();
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 558be36..a157150 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -19,6 +19,9 @@ package org.apache.phoenix.compile;
 
 import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.util.Collections;
@@ -30,11 +33,14 @@ import com.google.common.base.Optional;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
@@ -201,16 +207,6 @@ public class WhereCompiler {
                 return ref;
             }
             PTable table = ref.getTable();
-            // If current table in the context is local index and table in column reference is global that
-            // means the column is not present in the local index. Local indexes do not currently support this.
-            // Throwing this exception here will cause this plan to be ignored when enumerating possible plans
-            // during the optimizing phase.
-            if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL
-                    && (table.getIndexType() == null || table.getIndexType() == IndexType.GLOBAL)) {
-                String schemaNameStr = table.getSchemaName()==null?null:table.getSchemaName().getString();
-                String tableNameStr = table.getTableName()==null?null:table.getTableName().getString();
-                throw new ColumnNotFoundException(schemaNameStr, tableNameStr, null, ref.getColumn().getName().getString());
-            }
             // Track if we need to compare KeyValue during filter evaluation
             // using column family. If the column qualifier is enough, we
             // just use that.
@@ -282,6 +278,25 @@ public class WhereCompiler {
 
         if (LiteralExpression.isBooleanFalseOrNull(whereClause)) {
             context.setScanRanges(ScanRanges.NOTHING);
+        } else if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL) {
+            if (whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)) {
+                // pass any extra where as scan attribute so it can be evaluated after all
+                // columns from the main CF have been merged in
+                ByteArrayOutputStream stream = new ByteArrayOutputStream();
+                try {
+                    DataOutputStream output = new DataOutputStream(stream);
+                    WritableUtils.writeVInt(output, ExpressionType.valueOf(whereClause).ordinal());
+                    whereClause.write(output);
+                    stream.close();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER, stream.toByteArray());
+
+                // this is needed just for ExplainTable, since de-serializing an expression does not restore
+                // its display properties, and that cannot be changed, due to backwards compatibility
+                scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER_STR, Bytes.toBytes(whereClause.toString()));
+            }
         } else if (whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)) {
             Filter filter = null;
             final Counter counter = new Counter();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 2ad520f..7bb84f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -97,6 +97,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
         "_IndexRebuildDisableLoggingVerifyType";
     public static final String INDEX_REBUILD_DISABLE_LOGGING_BEYOND_MAXLOOKBACK_AGE =
         "_IndexRebuildDisableLoggingBeyondMaxLookbackAge";
+    public static final String LOCAL_INDEX_FILTER = "_LocalIndexFilter";
+    public static final String LOCAL_INDEX_FILTER_STR = "_LocalIndexFilterStr";
 
     /* 
     * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 3023ad9..9190a33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -405,6 +405,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
         BitSet trackedColumnsBitset = isPossibleToUseEncodedCQFilter(encodingScheme, storageScheme) && !hasDynamicColumns(table) ? new BitSet(10) : null;
         boolean filteredColumnNotInProjection = false;
+
         for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
             byte[] filteredFamily = whereCol.getFirst();
             if (!(familyMap.containsKey(filteredFamily))) {
@@ -444,22 +445,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 preventSeekToColumn = referencedCfCount == 1 && hbaseServerVersion < MIN_SEEK_TO_COLUMN_VERSION;
             }
         }
-        for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
-            ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
-            NavigableSet<byte[]> qs = entry.getValue();
-            NavigableSet<ImmutableBytesPtr> cols = null;
-            if (qs != null) {
-                cols = new TreeSet<ImmutableBytesPtr>();
-                for (byte[] q : qs) {
-                    cols.add(new ImmutableBytesPtr(q));
-                    if (trackedColumnsBitset != null) {
-                        int qualifier = encodingScheme.decode(q);
-                        trackedColumnsBitset.set(qualifier);
-                    }
-                }
-            }
-            columnsTracker.put(cf, cols);
-        }
         // Making sure that where condition CFs are getting scanned at HRS.
         for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
             byte[] family = whereCol.getFirst();
@@ -492,6 +477,22 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 }
             }
         }
+        for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+            ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
+            NavigableSet<byte[]> qs = entry.getValue();
+            NavigableSet<ImmutableBytesPtr> cols = null;
+            if (qs != null) {
+                cols = new TreeSet<ImmutableBytesPtr>();
+                for (byte[] q : qs) {
+                    cols.add(new ImmutableBytesPtr(q));
+                    if (trackedColumnsBitset != null) {
+                        int qualifier = encodingScheme.decode(q);
+                        trackedColumnsBitset.set(qualifier);
+                    }
+                }
+            }
+            columnsTracker.put(cf, cols);
+        }
         if (!columnsTracker.isEmpty()) {
             if (preventSeekToColumn) {
                 for (ImmutableBytesPtr f : columnsTracker.keySet()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 31713d9..cf5e021 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -178,10 +178,19 @@ public abstract class ExplainTable {
                 }
             } while (filterIterator.hasNext());
         }
+        String whereFilterStr = null;
         if (whereFilter != null) {
+            whereFilterStr = whereFilter.toString();
+        } else {
+            byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER_STR);
+            if (expBytes != null) {
+                whereFilterStr = Bytes.toString(expBytes);
+            }
+        }
+        if (whereFilterStr != null) {
             String serverWhereFilter = "SERVER FILTER BY "
                 + (firstKeyOnlyFilter == null ? "" : "FIRST KEY ONLY AND ")
-                + whereFilter.toString();
+                + whereFilterStr;
             planSteps.add("    " + serverWhereFilter);
             if (explainPlanAttributesBuilder != null) {
                 explainPlanAttributesBuilder.setServerWhereFilter(serverWhereFilter);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 13d75ea..bb0607c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -280,6 +280,11 @@ public class OrderedResultIterator implements PeekingResultIterator {
             final SizeAwareQueue<ResultEntry> queueEntries = ((RecordPeekingResultIterator)resultIterator).getQueueEntries();
             long startTime = EnvironmentEdgeManager.currentTimeMillis();
             for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
+                // result might be empty if it was filtered by a local index
+                if (result.size() == 0) {
+                    continue;
+                }
+
                 if (isDummy(result)) {
                     dummyTuple = result;
                     return resultIterator;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 5b587df..6be45d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -43,9 +43,12 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.generated.DynamicColumnMetaDataProtos;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -65,6 +68,8 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -126,6 +131,25 @@ public abstract class RegionScannerFactory {
       private byte[] actualStartKey = getActualStartKey();
       private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
       final long pageSizeMs = getPageSizeMsForRegionScanner(scan);
+      Expression extraWhere = null;
+
+      {
+          // for local indexes construct the row filter for uncovered columns if it exists
+          if (ScanUtil.isLocalIndex(scan)) {
+              byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER);
+              if (expBytes != null) {
+                  try {
+                      ByteArrayInputStream stream = new ByteArrayInputStream(expBytes);
+                      DataInputStream input = new DataInputStream(stream);
+                      extraWhere = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+                      extraWhere.readFields(input);
+                  } catch (IOException io) {
+                      // should not happen since we're reading from a byte[]
+                      throw new RuntimeException(io);
+                  }
+              }
+          }
+      }
 
       // Get the actual scan start row of local index. This will be used to compare the row
       // key of the results less than scan start row when there are references.
@@ -207,6 +231,17 @@ public abstract class RegionScannerFactory {
              */
             IndexUtil.wrapResultUsingOffset(env, result, offset, dataColumns,
                 tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+
+            if (extraWhere != null) {
+                Tuple merged = useQualifierAsListIndex ? new PositionBasedResultTuple(result) :
+                    new ResultTuple(Result.create(result));
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                extraWhere.evaluate(merged, ptr);
+                if (!Boolean.TRUE.equals(extraWhere.getDataType().toObject(ptr))) {
+                    result.clear();
+                    return next;
+                }
+            }
           }
           if (projector != null) {
             Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) :
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
index b3ce57a..83909c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
@@ -142,7 +142,7 @@ public class PVarbinary extends PBinaryBase {
         StringBuilder buf = new StringBuilder();
         buf.append('[');
         if (length > 0) {
-            for (int i = o; i < length; i++) {
+            for (int i = o; i < o + length; i++) {
                 buf.append(0xFF & b[i]);
                 buf.append(',');
             }