You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2021/03/12 06:06:22 UTC

[phoenix] branch 4.16 updated (f8850ad -> c619d8e)

This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a change to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git.


    from f8850ad  Update version to 4.16.1-SNAPSHOT
     new 28816e2  PHOENIX-6386 Bulkload generates unverified index rows
     new 7c22d20  PHOENIX-6396 PChar illegal data exception should not contain value
     new 2d77217  PHOENIX-6370 4.x branch still includes the phoenix-pig example files
     new d1cd37f  PHOENIX-6388 Add sampled logging for read repairs
     new 9b6b0fc  PHOENIX-6400 Do no use local index with uncovered columns in the WHERE clause.
     new ad433a0  PHOENIX-6402 Allow using local indexes with uncovered columns in the WHERE clause.
     new c619d8e  PHOENIX-6408 LIMIT on local index query with uncovered columns in the WHERE returns wrong result.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 examples/pig/test.pig                              | 19 -----
 examples/pig/testdata                              | 18 ----
 .../apache/phoenix/end2end/CsvBulkLoadToolIT.java  | 39 ++++++++-
 .../apache/phoenix/end2end/index/LocalIndexIT.java | 96 +++++++++++++++++++++-
 .../org/apache/phoenix/compile/WhereCompiler.java  | 26 ++++++
 .../coprocessor/BaseScannerRegionObserver.java     |  3 +
 .../apache/phoenix/index/GlobalIndexChecker.java   | 27 ++++++
 .../phoenix/iterate/BaseResultIterators.java       | 42 ++++++----
 .../org/apache/phoenix/iterate/ExplainTable.java   | 22 ++++-
 .../phoenix/iterate/OrderedResultIterator.java     |  5 ++
 .../phoenix/iterate/RegionScannerFactory.java      | 43 ++++++++++
 .../mapreduce/FormatToBytesWritableMapper.java     | 61 ++++++++++++--
 .../ImportPreUpsertKeyValueProcessor.java          |  4 +-
 .../org/apache/phoenix/schema/types/PChar.java     | 10 +--
 .../apache/phoenix/schema/types/PVarbinary.java    |  2 +-
 .../apache/phoenix/compile/QueryCompilerTest.java  |  4 +-
 16 files changed, 346 insertions(+), 75 deletions(-)
 delete mode 100644 examples/pig/test.pig
 delete mode 100644 examples/pig/testdata


[phoenix] 03/07: PHOENIX-6370 4.x branch still includes the phoenix-pig example files

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 2d77217993acc15c38ea282f18a0a5c10b51a2eb
Author: Istvan Toth <st...@apache.org>
AuthorDate: Tue Feb 23 06:45:45 2021 +0100

    PHOENIX-6370 4.x branch still includes the phoenix-pig example files
---
 examples/pig/test.pig | 19 -------------------
 examples/pig/testdata | 18 ------------------
 2 files changed, 37 deletions(-)

diff --git a/examples/pig/test.pig b/examples/pig/test.pig
deleted file mode 100644
index 6835d00..0000000
--- a/examples/pig/test.pig
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-A = load 'examples/pig/testdata' as (a:chararray, b:chararray, c:int, d:chararray, e: datetime) ;
-STORE A into 'hbase://TESTPHX' using org.apache.phoenix.pig.PhoenixHBaseStorage('localhost','-batchSize 1000');
diff --git a/examples/pig/testdata b/examples/pig/testdata
deleted file mode 100644
index 15f3f0b..0000000
--- a/examples/pig/testdata
+++ /dev/null
@@ -1,18 +0,0 @@
-00D300000000XHP	124	123456	weq	2012-12-12
-00D300000000XHP	111	123456	nab	2012-01-21
-00D300000000UIH	101	123456	ben	2014-01-01
-00D300000000XHP	124	123456	weq	2012-12-12
-00D300000000XHP	111	123456	nab	2012-01-21
-00D300000000UIH	101	123456	ben	2014-01-01
-00D300000000XHP	124	123456	weq	2012-12-12
-00D300000000XHP	111	123456	nab	2012-01-21
-00D300000000UIH	101	123456	ben	2014-01-01
-00D300000000XHP	124	123456	weq	2012-12-12
-00D300000000XHP	111	123456	nab	2012-01-21
-00D300000000UIH	101	123456	ben	2014-01-01
-00D300000000XHP	124	123456	weq	2012-12-12
-00D300000000XHP	111	123456	nab	2012-01-21
-00D300000000UIH	101	123456	ben	2014-01-01
-00D300000000XHP	124	123456	weq	2012-12-12
-00D300000000XHP	111	123456	nab	2012-01-21
-00D300000000UIH	101	123456	ben	2014-01-01


[phoenix] 06/07: PHOENIX-6402 Allow using local indexes with uncovered columns in the WHERE clause.

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit ad433a037980c403442e9776fbc950e675fbfa9f
Author: Lars <la...@apache.org>
AuthorDate: Sat Mar 6 12:51:14 2021 -0800

    PHOENIX-6402 Allow using local indexes with uncovered columns in the WHERE clause.
---
 .../apache/phoenix/end2end/index/LocalIndexIT.java | 84 ++++++++++++++++++----
 .../org/apache/phoenix/compile/WhereCompiler.java  | 35 ++++++---
 .../coprocessor/BaseScannerRegionObserver.java     |  2 +
 .../phoenix/iterate/BaseResultIterators.java       | 33 ++++-----
 .../org/apache/phoenix/iterate/ExplainTable.java   | 11 ++-
 .../phoenix/iterate/OrderedResultIterator.java     |  5 ++
 .../phoenix/iterate/RegionScannerFactory.java      | 35 +++++++++
 .../apache/phoenix/schema/types/PVarbinary.java    |  2 +-
 8 files changed, 165 insertions(+), 42 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 276395e..a3f3ed1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -81,22 +81,77 @@ public class LocalIndexIT extends BaseLocalIndexIT {
 
     @Test
     public void testSelectFromIndexWithAdditionalWhereClause() throws Exception {
+        if (isNamespaceMapped) {
+            return;
+        }
         String tableName = schemaName + "." + generateUniqueName();
         String indexName = "IDX_" + generateUniqueName();
 
         Connection conn = getConnection();
         conn.setAutoCommit(true);
-        if (isNamespaceMapped) {
-            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
-        }
 
-        conn.createStatement().execute("CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, v1 FLOAT, v2 FLOAT)");
+        conn.createStatement().execute("CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, v1 FLOAT, v2 FLOAT, v3 INTEGER)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(1, 2, 3, 4)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(2, 3, 4, 5)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(3, 4, 5, 6)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(4, 5, 6, 7)");
+
         conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(1, 0.01, 1.0)");
-        ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 0.1 and v2 < 10.0");
+        testExtraWhere(conn, tableName);
+
+        conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName);
+        conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v3)");
+        testExtraWhere(conn, tableName);
+
+        conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName);
+        conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v2)");
+        testExtraWhere(conn, tableName);
+
+        conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName);
+        conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE (v2,v3)");
+        testExtraWhere(conn, tableName);
+    }
+
+    private void testExtraWhere(Connection conn, String tableName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 3 AND v2 < 4");
+        rs.next();
+        assertEquals(1, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 3 AND v3 < 5");
+        rs.next();
+        assertEquals(1, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 10 AND v2 < 0 AND v3 < 0");
+        rs.next();
+        assertEquals(0, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 <= 2 AND v2 > 0 AND v3 < 5");
         rs.next();
         assertEquals(1, rs.getInt(1));
         rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT pk FROM "+tableName+" WHERE v1 > 3 AND v2 > 0 AND v3 > 6");
+        rs.next();
+        assertEquals(4, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT v1 FROM "+tableName+" WHERE v1 > 3 AND v2 > 0 AND v3 > 6");
+        rs.next();
+        assertEquals(5, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT pk FROM "+tableName+" WHERE (v1,v2) IN ((1,5),(4,5))");
+        rs.next();
+        assertEquals(3, rs.getInt(1));
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT v3 FROM "+tableName+" WHERE (v1,v2) IN ((1,5),(4,5))");
+        rs.next();
+        assertEquals(6, rs.getInt(1));
+        rs.close();
     }
 
     @Test
@@ -231,13 +286,13 @@ public class LocalIndexIT extends BaseLocalIndexIT {
                     QueryUtil.getExplainPlan(rs));
         rs.close();
 
-        // 4. Longer prefix on the index.
-        // Note: This cannot use the local index, see PHOENIX-6300
+        // 4. Longer prefix on the index, use it.
         rs = conn.createStatement().executeQuery("EXPLAIN SELECT v2 FROM " + tableName + " WHERE pk1 = 3 AND pk2 = 4 AND v1 = 3 AND v3 = 1");
         assertEquals(
             "CLIENT PARALLEL 1-WAY RANGE SCAN OVER "
-                    + physicalTableName + " [3,4]\n"
-                    + "    SERVER FILTER BY (V1 = 3.0 AND V3 = 1)",
+                    + physicalTableName + " [1,3,4,3]\n"
+                    + "    SERVER FILTER BY FIRST KEY ONLY AND \"V3\" = 1\n"
+                    + "CLIENT MERGE SORT",
                     QueryUtil.getExplainPlan(rs));
         rs.close();
     }
@@ -373,12 +428,13 @@ public class LocalIndexIT extends BaseLocalIndexIT {
                     QueryUtil.getExplainPlan(rs));
         rs.close();
 
-        // 10. Cannot use index even when also filtering on non-indexed column, see PHOENIX-6400
+        // 10. Use index even when also filtering on non-indexed column
         rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + tableName + " WHERE v2 = 2 AND v1 = 3");
         assertEquals(
-            "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
-                    + indexPhysicalTableName + "\n"
-                            + "    SERVER FILTER BY (V2 = 2.0 AND V1 = 3.0)",
+            "CLIENT PARALLEL 1-WAY RANGE SCAN OVER "
+                    + indexPhysicalTableName + " [1,2]\n"
+                    + "    SERVER FILTER BY FIRST KEY ONLY AND \"V1\" = 3.0\n"
+                    + "CLIENT MERGE SORT",
                     QueryUtil.getExplainPlan(rs));
         rs.close();
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 558be36..a157150 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -19,6 +19,9 @@ package org.apache.phoenix.compile;
 
 import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.util.Collections;
@@ -30,11 +33,14 @@ import com.google.common.base.Optional;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
@@ -201,16 +207,6 @@ public class WhereCompiler {
                 return ref;
             }
             PTable table = ref.getTable();
-            // If current table in the context is local index and table in column reference is global that
-            // means the column is not present in the local index. Local indexes do not currently support this.
-            // Throwing this exception here will cause this plan to be ignored when enumerating possible plans
-            // during the optimizing phase.
-            if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL
-                    && (table.getIndexType() == null || table.getIndexType() == IndexType.GLOBAL)) {
-                String schemaNameStr = table.getSchemaName()==null?null:table.getSchemaName().getString();
-                String tableNameStr = table.getTableName()==null?null:table.getTableName().getString();
-                throw new ColumnNotFoundException(schemaNameStr, tableNameStr, null, ref.getColumn().getName().getString());
-            }
             // Track if we need to compare KeyValue during filter evaluation
             // using column family. If the column qualifier is enough, we
             // just use that.
@@ -282,6 +278,25 @@ public class WhereCompiler {
 
         if (LiteralExpression.isBooleanFalseOrNull(whereClause)) {
             context.setScanRanges(ScanRanges.NOTHING);
+        } else if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL) {
+            if (whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)) {
+                // pass any extra where as scan attribute so it can be evaluated after all
+                // columns from the main CF have been merged in
+                ByteArrayOutputStream stream = new ByteArrayOutputStream();
+                try {
+                    DataOutputStream output = new DataOutputStream(stream);
+                    WritableUtils.writeVInt(output, ExpressionType.valueOf(whereClause).ordinal());
+                    whereClause.write(output);
+                    stream.close();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER, stream.toByteArray());
+
+                // this is needed just for ExplainTable, since de-serializing an expression does not restore
+                // its display properties, and that cannot be changed, due to backwards compatibility
+                scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER_STR, Bytes.toBytes(whereClause.toString()));
+            }
         } else if (whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)) {
             Filter filter = null;
             final Counter counter = new Counter();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 2ad520f..7bb84f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -97,6 +97,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
         "_IndexRebuildDisableLoggingVerifyType";
     public static final String INDEX_REBUILD_DISABLE_LOGGING_BEYOND_MAXLOOKBACK_AGE =
         "_IndexRebuildDisableLoggingBeyondMaxLookbackAge";
+    public static final String LOCAL_INDEX_FILTER = "_LocalIndexFilter";
+    public static final String LOCAL_INDEX_FILTER_STR = "_LocalIndexFilterStr";
 
     /* 
     * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 3023ad9..9190a33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -405,6 +405,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
         BitSet trackedColumnsBitset = isPossibleToUseEncodedCQFilter(encodingScheme, storageScheme) && !hasDynamicColumns(table) ? new BitSet(10) : null;
         boolean filteredColumnNotInProjection = false;
+
         for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
             byte[] filteredFamily = whereCol.getFirst();
             if (!(familyMap.containsKey(filteredFamily))) {
@@ -444,22 +445,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 preventSeekToColumn = referencedCfCount == 1 && hbaseServerVersion < MIN_SEEK_TO_COLUMN_VERSION;
             }
         }
-        for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
-            ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
-            NavigableSet<byte[]> qs = entry.getValue();
-            NavigableSet<ImmutableBytesPtr> cols = null;
-            if (qs != null) {
-                cols = new TreeSet<ImmutableBytesPtr>();
-                for (byte[] q : qs) {
-                    cols.add(new ImmutableBytesPtr(q));
-                    if (trackedColumnsBitset != null) {
-                        int qualifier = encodingScheme.decode(q);
-                        trackedColumnsBitset.set(qualifier);
-                    }
-                }
-            }
-            columnsTracker.put(cf, cols);
-        }
         // Making sure that where condition CFs are getting scanned at HRS.
         for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
             byte[] family = whereCol.getFirst();
@@ -492,6 +477,22 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 }
             }
         }
+        for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+            ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
+            NavigableSet<byte[]> qs = entry.getValue();
+            NavigableSet<ImmutableBytesPtr> cols = null;
+            if (qs != null) {
+                cols = new TreeSet<ImmutableBytesPtr>();
+                for (byte[] q : qs) {
+                    cols.add(new ImmutableBytesPtr(q));
+                    if (trackedColumnsBitset != null) {
+                        int qualifier = encodingScheme.decode(q);
+                        trackedColumnsBitset.set(qualifier);
+                    }
+                }
+            }
+            columnsTracker.put(cf, cols);
+        }
         if (!columnsTracker.isEmpty()) {
             if (preventSeekToColumn) {
                 for (ImmutableBytesPtr f : columnsTracker.keySet()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index 31713d9..cf5e021 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -178,10 +178,19 @@ public abstract class ExplainTable {
                 }
             } while (filterIterator.hasNext());
         }
+        String whereFilterStr = null;
         if (whereFilter != null) {
+            whereFilterStr = whereFilter.toString();
+        } else {
+            byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER_STR);
+            if (expBytes != null) {
+                whereFilterStr = Bytes.toString(expBytes);
+            }
+        }
+        if (whereFilterStr != null) {
             String serverWhereFilter = "SERVER FILTER BY "
                 + (firstKeyOnlyFilter == null ? "" : "FIRST KEY ONLY AND ")
-                + whereFilter.toString();
+                + whereFilterStr;
             planSteps.add("    " + serverWhereFilter);
             if (explainPlanAttributesBuilder != null) {
                 explainPlanAttributesBuilder.setServerWhereFilter(serverWhereFilter);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 13d75ea..bb0607c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -280,6 +280,11 @@ public class OrderedResultIterator implements PeekingResultIterator {
             final SizeAwareQueue<ResultEntry> queueEntries = ((RecordPeekingResultIterator)resultIterator).getQueueEntries();
             long startTime = EnvironmentEdgeManager.currentTimeMillis();
             for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
+                // result might be empty if it was filtered by a local index
+                if (result.size() == 0) {
+                    continue;
+                }
+
                 if (isDummy(result)) {
                     dummyTuple = result;
                     return resultIterator;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 5b587df..6be45d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -43,9 +43,12 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.generated.DynamicColumnMetaDataProtos;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -65,6 +68,8 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -126,6 +131,25 @@ public abstract class RegionScannerFactory {
       private byte[] actualStartKey = getActualStartKey();
       private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
       final long pageSizeMs = getPageSizeMsForRegionScanner(scan);
+      Expression extraWhere = null;
+
+      {
+          // for local indexes construct the row filter for uncovered columns if it exists
+          if (ScanUtil.isLocalIndex(scan)) {
+              byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER);
+              if (expBytes != null) {
+                  try {
+                      ByteArrayInputStream stream = new ByteArrayInputStream(expBytes);
+                      DataInputStream input = new DataInputStream(stream);
+                      extraWhere = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+                      extraWhere.readFields(input);
+                  } catch (IOException io) {
+                      // should not happen since we're reading from a byte[]
+                      throw new RuntimeException(io);
+                  }
+              }
+          }
+      }
 
       // Get the actual scan start row of local index. This will be used to compare the row
       // key of the results less than scan start row when there are references.
@@ -207,6 +231,17 @@ public abstract class RegionScannerFactory {
              */
             IndexUtil.wrapResultUsingOffset(env, result, offset, dataColumns,
                 tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+
+            if (extraWhere != null) {
+                Tuple merged = useQualifierAsListIndex ? new PositionBasedResultTuple(result) :
+                    new ResultTuple(Result.create(result));
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                extraWhere.evaluate(merged, ptr);
+                if (!Boolean.TRUE.equals(extraWhere.getDataType().toObject(ptr))) {
+                    result.clear();
+                    return next;
+                }
+            }
           }
           if (projector != null) {
             Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) :
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
index b3ce57a..83909c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
@@ -142,7 +142,7 @@ public class PVarbinary extends PBinaryBase {
         StringBuilder buf = new StringBuilder();
         buf.append('[');
         if (length > 0) {
-            for (int i = o; i < length; i++) {
+            for (int i = o; i < o + length; i++) {
                 buf.append(0xFF & b[i]);
                 buf.append(',');
             }


[phoenix] 02/07: PHOENIX-6396 PChar illegal data exception should not contain value

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 7c22d206a5ea0a06099082447a30ed59c353f8a8
Author: Xinyi Yan <ya...@apache.org>
AuthorDate: Mon Feb 22 17:02:53 2021 -0800

    PHOENIX-6396 PChar illegal data exception should not contain value
---
 .../src/main/java/org/apache/phoenix/schema/types/PChar.java   | 10 +++++-----
 .../java/org/apache/phoenix/compile/QueryCompilerTest.java     |  4 +++-
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PChar.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PChar.java
index 017e813..f6d9a10 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PChar.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PChar.java
@@ -85,7 +85,7 @@ public class PChar extends PDataType<String> {
       }
       byte[] b = PVarchar.INSTANCE.toBytes(object);
       if (b.length != ((String) object).length()) {
-        throw newIllegalDataException("CHAR types may only contain single byte characters (" + object + ")");
+        throw newIllegalDataException("CHAR types may only contain single byte characters.");
       }
       return b;
     }
@@ -97,7 +97,7 @@ public class PChar extends PDataType<String> {
       }
       int len = PVarchar.INSTANCE.toBytes(object, bytes, offset);
       if (len != ((String) object).length()) {
-        throw newIllegalDataException("CHAR types may only contain single byte characters (" + object + ")");
+        throw newIllegalDataException("CHAR types may only contain single byte characters.");
       }
       return len;
     }
@@ -118,7 +118,7 @@ public class PChar extends PDataType<String> {
       // TODO: UTF-8 decoder that will invert as it decodes
       String s = Bytes.toString(bytes, offset, length);
       if (length != s.length()) {
-        throw newIllegalDataException("CHAR types may only contain single byte characters (" + s + ")");
+        throw newIllegalDataException("CHAR types may only contain single byte characters.");
       }
       return s;
     }
@@ -142,7 +142,7 @@ public class PChar extends PDataType<String> {
         Integer actualMaxLength, Integer actualScale, SortOrder actualModifier,
         Integer desiredMaxLength, Integer desiredScale, SortOrder expectedModifier) {
       if (o != null && actualType.equals(PVarchar.INSTANCE) && ((String)o).length() != ptr.getLength()) {
-        throw newIllegalDataException("CHAR types may only contain single byte characters (" + o + ")");
+        throw newIllegalDataException("CHAR types may only contain single byte characters.");
       }
       super.coerceBytes(ptr, o, actualType, actualMaxLength, actualScale, actualModifier, desiredMaxLength, desiredScale, expectedModifier);
       if (ptr.getLength() > 0 && desiredMaxLength != null &&
@@ -201,7 +201,7 @@ public class PChar extends PDataType<String> {
     @Override
     public Object toObject(String value) {
       if (StringUtil.hasMultiByteChars(value)) {
-        throw newIllegalDataException("CHAR types may only contain single byte characters (" + value + ")");
+        throw newIllegalDataException("CHAR types may only contain single byte characters.");
       }
       return value;
     }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index ac32956..7a450a8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -631,6 +631,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
 
     @Test
     public void testUpsertMultiByteIntoChar() throws Exception {
+        String value = "繰り返し曜日マスク";
         try {
             // Select non agg column in aggregate query
             String query = "upsert into ATABLE VALUES (?, ?, ?)";
@@ -639,7 +640,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
             try {
                 PreparedStatement statement = conn.prepareStatement(query);
                 statement.setString(1, "00D300000000XHP");
-                statement.setString(2, "繰り返し曜日マスク");
+                statement.setString(2, value);
                 statement.setInt(3, 1);
                 statement.executeUpdate();
                 fail();
@@ -649,6 +650,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
         } catch (SQLException e) {
             assertTrue(e.getMessage(), e.getMessage().contains("ERROR 201 (22000): Illegal data."));
             assertTrue(e.getMessage().contains("CHAR types may only contain single byte characters"));
+            assertFalse(e.getMessage().contains(value));
         }
     }
 


[phoenix] 05/07: PHOENIX-6400 Do no use local index with uncovered columns in the WHERE clause.

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 9b6b0fc31a7b52eccf18f7ca709c2550a3a1ef52
Author: Lars <la...@apache.org>
AuthorDate: Wed Mar 3 13:01:07 2021 -0800

    PHOENIX-6400 Do no use local index with uncovered columns in the WHERE clause.
---
 .../apache/phoenix/end2end/index/LocalIndexIT.java | 37 ++++++++++++++++------
 .../org/apache/phoenix/compile/WhereCompiler.java  | 11 +++++++
 2 files changed, 39 insertions(+), 9 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 14e85ab..276395e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -80,6 +80,26 @@ public class LocalIndexIT extends BaseLocalIndexIT {
     }
 
     @Test
+    public void testSelectFromIndexWithAdditionalWhereClause() throws Exception {
+        String tableName = schemaName + "." + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+
+        Connection conn = getConnection();
+        conn.setAutoCommit(true);
+        if (isNamespaceMapped) {
+            conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+        }
+
+        conn.createStatement().execute("CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, v1 FLOAT, v2 FLOAT)");
+        conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES(1, 0.01, 1.0)");
+        ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM "+tableName+" WHERE v1 < 0.1 and v2 < 10.0");
+        rs.next();
+        assertEquals(1, rs.getInt(1));
+        rs.close();
+    }
+
+    @Test
     public void testDeleteFromLocalIndex() throws Exception {
         String tableName = schemaName + "." + generateUniqueName();
         String indexName = "IDX_" + generateUniqueName();
@@ -211,13 +231,13 @@ public class LocalIndexIT extends BaseLocalIndexIT {
                     QueryUtil.getExplainPlan(rs));
         rs.close();
 
-        // 4. Longer prefix on the index, use it.
+        // 4. Longer prefix on the index.
+        // Note: This cannot use the local index, see PHOENIX-6300
         rs = conn.createStatement().executeQuery("EXPLAIN SELECT v2 FROM " + tableName + " WHERE pk1 = 3 AND pk2 = 4 AND v1 = 3 AND v3 = 1");
         assertEquals(
             "CLIENT PARALLEL 1-WAY RANGE SCAN OVER "
-                    + physicalTableName + " [1,3,4,3]\n"
-                    + "    SERVER FILTER BY FIRST KEY ONLY AND \"V3\" = 1\n"
-                    + "CLIENT MERGE SORT",
+                    + physicalTableName + " [3,4]\n"
+                    + "    SERVER FILTER BY (V1 = 3.0 AND V3 = 1)",
                     QueryUtil.getExplainPlan(rs));
         rs.close();
     }
@@ -353,13 +373,12 @@ public class LocalIndexIT extends BaseLocalIndexIT {
                     QueryUtil.getExplainPlan(rs));
         rs.close();
 
-        // 10. Use index even when also filtering on non-indexed column
+        // 10. Cannot use index even when also filtering on non-indexed column, see PHOENIX-6400
         rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + tableName + " WHERE v2 = 2 AND v1 = 3");
         assertEquals(
-            "CLIENT PARALLEL 1-WAY RANGE SCAN OVER "
-                    + indexPhysicalTableName + " [1,2]\n"
-                            + "    SERVER FILTER BY FIRST KEY ONLY AND \"V1\" = 3.0\n"
-                            + "CLIENT MERGE SORT",
+            "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
+                    + indexPhysicalTableName + "\n"
+                            + "    SERVER FILTER BY (V2 = 2.0 AND V1 = 3.0)",
                     QueryUtil.getExplainPlan(rs));
         rs.close();
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index a5fd6c3..558be36 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableType;
@@ -200,6 +201,16 @@ public class WhereCompiler {
                 return ref;
             }
             PTable table = ref.getTable();
+            // If current table in the context is local index and table in column reference is global that
+            // means the column is not present in the local index. Local indexes do not currently support this.
+            // Throwing this exception here will cause this plan to be ignored when enumerating possible plans
+            // during the optimizing phase.
+            if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL
+                    && (table.getIndexType() == null || table.getIndexType() == IndexType.GLOBAL)) {
+                String schemaNameStr = table.getSchemaName()==null?null:table.getSchemaName().getString();
+                String tableNameStr = table.getTableName()==null?null:table.getTableName().getString();
+                throw new ColumnNotFoundException(schemaNameStr, tableNameStr, null, ref.getColumn().getName().getString());
+            }
             // Track if we need to compare KeyValue during filter evaluation
             // using column family. If the column qualifier is enough, we
             // just use that.


[phoenix] 01/07: PHOENIX-6386 Bulkload generates unverified index rows

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 28816e2086699302780a4f4918ae7a2071182cc1
Author: Istvan Toth <st...@apache.org>
AuthorDate: Thu Feb 18 11:36:31 2021 +0100

    PHOENIX-6386 Bulkload generates unverified index rows
---
 .../apache/phoenix/end2end/CsvBulkLoadToolIT.java  | 39 +++++++++++++-
 .../mapreduce/FormatToBytesWritableMapper.java     | 61 +++++++++++++++++++---
 .../ImportPreUpsertKeyValueProcessor.java          |  4 +-
 3 files changed, 94 insertions(+), 10 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 2de2c82..762d613 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -21,13 +21,16 @@ import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Map;
 
@@ -36,16 +39,25 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.CsvBulkLoadTool;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -318,6 +330,27 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
 
         rs.close();
         stmt.close();
+
+        checkIndexTableIsVerified("TABLE3_IDX");
+    }
+
+    private void checkIndexTableIsVerified(String indexTableName) throws SQLException, IOException {
+        ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        Table hTable = cqs.getTable(Bytes.toBytes(indexTableName));
+        PTable pTable = PhoenixRuntime.getTable(conn, indexTableName);
+
+        byte[] emptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(pTable);
+        byte[] emptyKeyValueQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(pTable).getFirst();
+
+        Scan scan = new Scan();
+        scan.setFilter(new SingleColumnValueFilter(
+                emptyKeyValueCF,
+                emptyKeyValueQualifier,
+                CompareOp.NOT_EQUAL,
+                IndexRegionObserver.VERIFIED_BYTES));
+        try (ResultScanner scanner = hTable.getScanner(scan)) {
+            assertNull("There are non VERIFIED rows in index", scanner.next());
+        }
     }
 
     @Test
@@ -405,6 +438,10 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
 
         rs.close();
         stmt.close();
+
+        if (!localIndex) {
+            checkIndexTableIsVerified(indexTableName);
+        }
     }
     
     @Test
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index 63840e7..ecd6110 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -56,6 +57,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.UpsertExecutor;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,6 +109,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
     protected PhoenixConnection conn;
     protected UpsertExecutor<RECORD, ?> upsertExecutor;
     protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
+    protected IndexStatusUpdater[] indexStatusUpdaters;
     protected List<String> tableNames;
     protected List<String> logicalNames;
     protected MapperUpsertListener<RECORD> upsertListener;
@@ -179,18 +182,19 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
             while (uncommittedDataIterator.hasNext()) {
                 Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
                 List<KeyValue> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
-                byte[] first = kvPair.getFirst();
+                byte[] tableName = kvPair.getFirst();
+                keyValueList = preUpdateProcessor.preUpsert(tableName, keyValueList);
                 // Create a list of KV for each table
                 for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), tableName) == 0) {
                         if (!map.containsKey(i)) {
                             map.put(i, new ArrayList<KeyValue>());
                         }
-                        List<KeyValue> list = map.get(i);
-                        for (KeyValue kv : keyValueList) {
-                            list.add(kv);
+                        List<KeyValue> cellsForTable = map.get(i);
+                        if (indexStatusUpdaters[i] != null) {
+                            indexStatusUpdaters[i].setVerfied(keyValueList);
                         }
+                        cellsForTable.addAll(keyValueList);
                         break;
                     }
                 }
@@ -213,6 +217,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
      */
     private void initColumnIndexes() throws SQLException {
         columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+        indexStatusUpdaters = new IndexStatusUpdater[logicalNames.size()];
         int columnIndex = 0;
         for (int index = 0; index < logicalNames.size(); index++) {
             PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
@@ -249,6 +254,10 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
             byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
             columnIndexes.put(cfn, new Integer(columnIndex));
             columnIndex++;
+            if (PTable.IndexType.GLOBAL == table.getIndexType()) {
+                indexStatusUpdaters[index] =
+                        new IndexStatusUpdater(emptyColumnFamily, emptyKeyValue);
+            }
         }
     }
 
@@ -414,8 +423,46 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
             ImportPreUpsertKeyValueProcessor {
 
         @Override
-        public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
+        public List<KeyValue> preUpsert(byte[] tableName, List<KeyValue> keyValues) {
             return keyValues;
         }
     }
+
+    /**
+     * Updates the EMPTY cell value to VERIFIED for global index table rows
+     */
+    private static class IndexStatusUpdater {
+
+        private final byte[] emptyKeyValueCF;
+        private final int emptyKeyValueCFLength;
+        private final byte[] emptyKeyValueQualifier;
+        private final int emptyKeyValueQualifierLength;
+
+        public IndexStatusUpdater(final byte[] emptyKeyValueCF, final byte[] emptyKeyValueQualifier) {
+            this.emptyKeyValueCF = emptyKeyValueCF;
+            this.emptyKeyValueQualifier = emptyKeyValueQualifier;
+            emptyKeyValueCFLength = emptyKeyValueCF.length;
+            emptyKeyValueQualifierLength = emptyKeyValueQualifier.length;
+        }
+
+        /**
+         * Update the Empty cell values to VERIFIED in the passed keyValues list
+         * 
+         * @param keyValues will be modified
+         */
+        public void setVerfied(List<KeyValue> keyValues) {
+            for (int i = 0; i < keyValues.size() ; i++) {
+                Cell kv = keyValues.get(i);
+                if (CellUtil.matchingFamily(kv, emptyKeyValueCF, 0, emptyKeyValueCFLength)
+                        && CellUtil.matchingQualifier(kv, emptyKeyValueQualifier, 0, emptyKeyValueQualifierLength)) {
+                    if (kv.getValueLength() != 1) {
+                        //This should never happen. Fail fast if it does.
+                       throw new IllegalArgumentException("Empty cell value length is not 1");
+                    }
+                    //We are directly overwriting the value for performance
+                    kv.getValueArray()[kv.getValueOffset()] = IndexRegionObserver.VERIFIED_BYTES[0];
+                }
+            }
+        }
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
index dff9ef2..d924df1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
@@ -39,10 +39,10 @@ public interface ImportPreUpsertKeyValueProcessor {
      * Implementors can filter certain KeyValues from the list, augment the list, or return the
      * same list.
      *
-     * @param rowKey the row key for the key values that are being passed in
+     * @param tableName the table name for the key values that are being passed in
      * @param keyValues list of KeyValues that are to be written to an HFile
      * @return the list that will actually be written
      */
-    List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues);
+    List<KeyValue> preUpsert(byte[] tableName, List<KeyValue> keyValues);
 
 }


[phoenix] 04/07: PHOENIX-6388 Add sampled logging for read repairs

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit d1cd37fd87ce30edb7f9d22faa0aec00df07ef1f
Author: Xinyi Yan <ya...@apache.org>
AuthorDate: Tue Feb 23 10:41:51 2021 -0800

    PHOENIX-6388 Add sampled logging for read repairs
---
 .../apache/phoenix/index/GlobalIndexChecker.java   | 27 ++++++++++++++++++++++
 1 file changed, 27 insertions(+)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 7720ae6..374c292 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -101,6 +102,9 @@ import org.slf4j.LoggerFactory;
  */
 public class GlobalIndexChecker extends BaseScannerRegionObserver {
     private static final Logger LOG = LoggerFactory.getLogger(GlobalIndexChecker.class);
+    private static final String REPAIR_LOGGING_PERCENT_ATTRIB = "phoenix.index.repair.logging.percent";
+    private static final double DEFAULT_REPAIR_LOGGING_PERCENT = 1;
+
     private HTableFactory hTableFactory;
     private GlobalIndexCheckerSource metricsSource;
 
@@ -145,6 +149,8 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver {
         private long pageSize = Long.MAX_VALUE;
         private boolean restartScanDueToPageFilterRemoval = false;
         private boolean hasMore;
+        private double loggingPercent;
+        private Random random;
         private String indexName;
         private long pageSizeMs;
 
@@ -176,6 +182,9 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver {
                         "repairIndexRows: IndexMaintainer is not included in scan attributes for " +
                                 region.getRegionInfo().getTable().getNameAsString());
             }
+            loggingPercent = env.getConfiguration().getDouble(REPAIR_LOGGING_PERCENT_ATTRIB,
+                    DEFAULT_REPAIR_LOGGING_PERCENT);
+            random = new Random(EnvironmentEdgeManager.currentTimeMillis());
             pageSizeMs = getPageSizeMsForRegionScanner(scan);
         }
 
@@ -586,17 +595,28 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver {
                 long ts = cellList.get(0).getTimestamp();
                 cellList.clear();
 
+                long repairTime;
                 try {
                     repairIndexRows(rowKey, ts, cellList);
+                    repairTime = EnvironmentEdgeManager.currentTimeMillis() - repairStart;
                     metricsSource.incrementIndexRepairs(indexName);
                     metricsSource.updateUnverifiedIndexRowAge(indexName,
                         EnvironmentEdgeManager.currentTimeMillis() - ts);
                     metricsSource.updateIndexRepairTime(indexName,
                         EnvironmentEdgeManager.currentTimeMillis() - repairStart);
+                    if (shouldLog()) {
+                        LOG.info("Index row repair on region {} took {} ms.",
+                                env.getRegionInfo().getRegionNameAsString(), repairTime);
+                    }
                 } catch (IOException e) {
+                    repairTime = EnvironmentEdgeManager.currentTimeMillis() - repairStart;
                     metricsSource.incrementIndexRepairFailures(indexName);
                     metricsSource.updateIndexRepairFailureTime(indexName,
                         EnvironmentEdgeManager.currentTimeMillis() - repairStart);
+                    if (shouldLog()) {
+                        LOG.warn("Index row repair failure on region {} took {} ms.",
+                                env.getRegionInfo().getRegionNameAsString(), repairTime);
+                    }
                     throw e;
                 }
 
@@ -607,6 +627,13 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver {
                 return true;
             }
         }
+
+        private boolean shouldLog() {
+            if (loggingPercent == 0) {
+                return false;
+            }
+            return (random.nextDouble() <= (loggingPercent / 100.0d));
+        }
     }
 
     @Override


[phoenix] 07/07: PHOENIX-6408 LIMIT on local index query with uncovered columns in the WHERE returns wrong result.

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit c619d8e793fc4e13ca634dbf05bc418f0e0f9517
Author: Lars <la...@apache.org>
AuthorDate: Tue Mar 9 19:36:48 2021 -0800

    PHOENIX-6408 LIMIT on local index query with uncovered columns in the WHERE returns wrong result.
---
 .../org/apache/phoenix/end2end/index/LocalIndexIT.java  | 17 +++++++++++++++++
 .../phoenix/coprocessor/BaseScannerRegionObserver.java  |  1 +
 .../org/apache/phoenix/iterate/BaseResultIterators.java |  9 ++++++++-
 .../java/org/apache/phoenix/iterate/ExplainTable.java   | 11 ++++++++++-
 .../apache/phoenix/iterate/RegionScannerFactory.java    |  8 ++++++++
 5 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index a3f3ed1..2de2c94 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -152,6 +152,23 @@ public class LocalIndexIT extends BaseLocalIndexIT {
         rs.next();
         assertEquals(6, rs.getInt(1));
         rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT * FROM "+tableName+" WHERE v1 > 0 AND v3 > 5 LIMIT 2");
+        assertTrue(rs.next());
+        assertTrue(rs.next());
+        assertFalse(rs.next());
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT * FROM "+tableName+" WHERE v1 > 0 AND v3 > 5 LIMIT 1");
+        assertTrue(rs.next());
+        assertFalse(rs.next());
+        rs.close();
+
+        rs = conn.createStatement().executeQuery("SELECT * FROM "+tableName+" WHERE v3 > 5 ORDER BY v1 LIMIT 2");
+        assertTrue(rs.next());
+        assertTrue(rs.next());
+        assertFalse(rs.next());
+        rs.close();
     }
 
     @Test
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 7bb84f7..b4c204b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -98,6 +98,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String INDEX_REBUILD_DISABLE_LOGGING_BEYOND_MAXLOOKBACK_AGE =
         "_IndexRebuildDisableLoggingBeyondMaxLookbackAge";
     public static final String LOCAL_INDEX_FILTER = "_LocalIndexFilter";
+    public static final String LOCAL_INDEX_LIMIT = "_LocalIndexLimit";
     public static final String LOCAL_INDEX_FILTER_STR = "_LocalIndexFilterStr";
 
     /* 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 9190a33..882cfaa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -281,7 +281,14 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
             }
 
             if (perScanLimit != null) {
-                ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
+                if (scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_FILTER) == null) {
+                    ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
+                } else {
+                    // if we have a local index filter and a limit, handle the limit after the filter
+                    // we cast the limit to a long even though it passed as an Integer so that
+                    // if we need extend this in the future the serialization is unchanged
+                    scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_LIMIT, Bytes.toBytes((long)perScanLimit));
+                }
             }
             
             if(offset!=null){
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
index cf5e021..163364c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java
@@ -226,8 +226,17 @@ public abstract class ExplainTable {
             if (offset != null) {
                 planSteps.add("    SERVER OFFSET " + offset);
             }
+            Long limit = null;
             if (pageFilter != null) {
-                planSteps.add("    SERVER " + pageFilter.getPageSize() + " ROW LIMIT");
+                limit = pageFilter.getPageSize();
+            } else {
+                byte[] limitBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_LIMIT);
+                if (limitBytes != null) {
+                    limit = Bytes.toLong(limitBytes);
+                }
+            }
+            if (limit != null) {
+                planSteps.add("    SERVER " + limit + " ROW LIMIT");
             }
             if (explainPlanAttributesBuilder != null) {
                 explainPlanAttributesBuilder.setServerOffset(offset);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 6be45d3..414f294 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -132,6 +132,7 @@ public abstract class RegionScannerFactory {
       private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
       final long pageSizeMs = getPageSizeMsForRegionScanner(scan);
       Expression extraWhere = null;
+      long extraLimit = -1;
 
       {
           // for local indexes construct the row filter for uncovered columns if it exists
@@ -148,6 +149,10 @@ public abstract class RegionScannerFactory {
                       throw new RuntimeException(io);
                   }
               }
+              byte[] limitBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_LIMIT);
+              if (limitBytes != null) {
+                  extraLimit = Bytes.toLong(limitBytes);
+              }
           }
       }
 
@@ -259,6 +264,9 @@ public abstract class RegionScannerFactory {
               result.add(arrayElementCell);
             }
           }
+          if (extraLimit >= 0 && --extraLimit == 0) {
+              return false;
+          }
           // There is a scanattribute set to retrieve the specific array element
           return next;
         } catch (Throwable t) {