You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2018/09/06 17:33:56 UTC

phoenix git commit: PHOENIX-4791 Array elements are nullified with joins

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 9390a91e9 -> 585692207


PHOENIX-4791 Array elements are nullified with joins


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/58569220
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/58569220
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/58569220

Branch: refs/heads/4.x-HBase-1.2
Commit: 58569220706b8c1d49ae649757abf8dca5818862
Parents: 9390a91
Author: Gerald Sangudi <gs...@23andme.com>
Authored: Wed Aug 22 16:59:12 2018 -0700
Committer: Thomas D'Silva <td...@apache.org>
Committed: Thu Sep 6 10:29:59 2018 -0700

----------------------------------------------------------------------
 .../ProjectArrayElemAfterHashJoinIT.java        | 177 +++++++++++++++++++
 .../coprocessor/HashJoinRegionScanner.java      |  69 ++++++--
 .../NonAggregateRegionScannerFactory.java       |   5 +-
 .../phoenix/iterate/RegionScannerFactory.java   |   7 +-
 4 files changed, 243 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/58569220/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java
new file mode 100644
index 0000000..170eb69
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Test;
+
+public class ProjectArrayElemAfterHashJoinIT extends ParallelStatsDisabledIT {
+
+    @Test
+    public void testSalted() throws Exception {
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+
+        try {
+            String table = createSalted(conn);
+            testTable(conn, table);
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testUnsalted() throws Exception {
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+
+        try {
+            String table = createUnsalted(conn);
+            testTable(conn, table);
+        } finally {
+            conn.close();
+        }
+    }
+
+    private void testTable(Connection conn, String table) throws Exception {
+
+        verifyExplain(conn, table, false, false);
+        verifyExplain(conn, table, false, true);
+        verifyExplain(conn, table, true, false);
+        verifyExplain(conn, table, true, true);
+
+        verifyResults(conn, table, false, false);
+        verifyResults(conn, table, false, true);
+        verifyResults(conn, table, true, false);
+        verifyResults(conn, table, true, true);
+    }
+
+    private String createSalted(Connection conn) throws Exception {
+
+        String table = "SALTED_" + generateUniqueName();
+        String create = "CREATE TABLE " + table + " ("
+            + " id INTEGER NOT NULL,"
+            + " vals TINYINT[],"
+            + " CONSTRAINT pk PRIMARY KEY (id)"
+            + ") SALT_BUCKETS = 4";
+
+        conn.createStatement().execute(create);
+        return table;
+    }
+
+    private String createUnsalted(Connection conn) throws Exception {
+
+        String table = "UNSALTED_" + generateUniqueName();
+        String create = "CREATE TABLE " + table + " ("
+            + " id INTEGER NOT NULL,"
+            + " vals TINYINT[],"
+            + " CONSTRAINT pk PRIMARY KEY (id)"
+            + ")";
+
+        conn.createStatement().execute(create);
+        return table;
+    }
+
+    private String getQuery(String table, boolean fullArray, boolean hashJoin) {
+
+        String query = "SELECT id, vals[1] v1, vals[2] v2, vals[3] v3, vals[4] v4"
+            + (fullArray ? ", vals" : "")
+            + " FROM " + table
+            + " WHERE id IN "
+            + (hashJoin ? "(SELECT 1)" : "(1, 2, 3)")
+            ;
+
+        return query;
+    }
+
+    private void verifyExplain(Connection conn, String table, boolean fullArray, boolean hashJoin)
+        throws Exception {
+
+        String query = "EXPLAIN " + getQuery(table, fullArray, hashJoin);
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery(query);
+
+        try {
+            String plan = QueryUtil.getExplainPlan(rs);
+            assertTrue(plan != null);
+            assertTrue(fullArray || plan.contains("SERVER ARRAY ELEMENT PROJECTION"));
+            assertTrue(hashJoin == plan.contains("JOIN"));
+        } finally {
+            rs.close();
+        }
+    }
+
+    private void verifyResults(Connection conn, String table, boolean fullArray, boolean hashJoin)
+        throws Exception {
+
+        String upsert = "UPSERT INTO " + table + "(id, vals)"
+            + " VALUES(1, ARRAY[10, 20, 30, 40, 50])";
+        PreparedStatement upsertStmt = conn.prepareStatement(upsert);
+        upsertStmt.execute();
+        conn.commit();
+
+        String query = getQuery(table, fullArray, hashJoin);
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery(query);
+
+        try {
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt("id"));
+            assertEquals(10, rs.getInt("v1"));
+            assertEquals(20, rs.getInt("v2"));
+            assertEquals(30, rs.getInt("v3"));
+            assertEquals(40, rs.getInt("v4"));
+
+            if (fullArray) {
+                java.sql.Array array = rs.getArray("vals");
+                assertTrue(array != null);
+                Object obj = array.getArray();
+                assertTrue(obj != null);
+                assertTrue(obj.getClass().isArray());
+                assertEquals(5, java.lang.reflect.Array.getLength(obj));
+            }
+
+            assertFalse(rs.next());
+        } finally {
+            rs.close();
+        }
+    }
+
+    private void dropTable(Connection conn, String table) throws Exception {
+
+        String drop = "DROP TABLE " + table;
+        Statement stmt = conn.createStatement();
+        stmt.execute(drop);
+        stmt.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/58569220/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 59f844d..d82aaba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -18,10 +18,12 @@
 package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -37,12 +39,15 @@ import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.iterate.RegionScannerFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -66,9 +71,27 @@ public class HashJoinRegionScanner implements RegionScanner {
     private ValueBitSet[] tempSrcBitSet;
     private final boolean useQualifierAsListIndex;
     private final boolean useNewValueColumnQualifier;
-    
+    private final boolean addArrayCell;
+
+    @SuppressWarnings("unchecked")
+    public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector,
+                                 HashJoinInfo joinInfo, ImmutableBytesPtr tenantId,
+                                 RegionCoprocessorEnvironment env, boolean useQualifierAsIndex,
+                                 boolean useNewValueColumnQualifier)
+        throws IOException {
+
+        this(env, scanner, null, null, projector, joinInfo,
+             tenantId, useQualifierAsIndex, useNewValueColumnQualifier);
+    }
+
     @SuppressWarnings("unchecked")
-    public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, RegionCoprocessorEnvironment env, boolean useQualifierAsIndex, boolean useNewValueColumnQualifier) throws IOException {
+    public HashJoinRegionScanner(RegionCoprocessorEnvironment env, RegionScanner scanner,
+                                 final Set<KeyValueColumnExpression> arrayKVRefs,
+                                 final Expression[] arrayFuncRefs, TupleProjector projector,
+                                 HashJoinInfo joinInfo, ImmutableBytesPtr tenantId,
+                                 boolean useQualifierAsIndex, boolean useNewValueColumnQualifier)
+        throws IOException {
+
         this.env = env;
         this.scanner = scanner;
         this.projector = projector;
@@ -103,7 +126,7 @@ public class HashJoinRegionScanner implements RegionScanner {
                         Bytes.toLong(ByteUtil.copyKeyBytesIfNecessary(joinId)));
                 throw new DoNotRetryIOException(cause.getMessage(), cause);
             }
-                
+
             hashCaches[i] = hashCache;
             tempSrcBitSet[i] = ValueBitSet.newInstance(joinInfo.getSchemas()[i]);
         }
@@ -113,16 +136,21 @@ public class HashJoinRegionScanner implements RegionScanner {
         }
         this.useQualifierAsListIndex = useQualifierAsIndex;
         this.useNewValueColumnQualifier = useNewValueColumnQualifier;
+        this.addArrayCell = (arrayFuncRefs != null && arrayFuncRefs.length > 0 &&
+                             arrayKVRefs != null && arrayKVRefs.size() > 0);
     }
 
     private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException {
         if (result.isEmpty())
             return;
         Tuple tuple = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result));
+        boolean projected = false;
+
         // For backward compatibility. In new versions, HashJoinInfo.forceProjection()
         // always returns true.
         if (joinInfo.forceProjection()) {
             tuple = projector.projectResults(tuple, useNewValueColumnQualifier);
+            projected = true;
         }
 
         // TODO: fix below Scanner.next() and Scanner.nextRaw() methods as well.
@@ -150,14 +178,15 @@ public class HashJoinRegionScanner implements RegionScanner {
                     dup *= (tempTuples[i] == null ? 1 : tempTuples[i].size());
                 }
                 for (int i = 0; i < dup; i++) {
-                    resultQueue.offer(tuple);
+                    offerResult(tuple, projected, result);
                 }
             } else {
                 KeyValueSchema schema = joinInfo.getJoinedSchema();
                 if (!joinInfo.forceProjection()) { // backward compatibility
                     tuple = projector.projectResults(tuple, useNewValueColumnQualifier);
+                    projected = true;
                 }
-                resultQueue.offer(tuple);
+                offerResult(tuple, projected, result);
                 for (int i = 0; i < count; i++) {
                     boolean earlyEvaluation = joinInfo.earlyEvaluation()[i];
                     JoinType type = joinInfo.getJoinTypes()[i];
@@ -173,7 +202,7 @@ public class HashJoinRegionScanner implements RegionScanner {
                                 if (type == JoinType.Inner || type == JoinType.Semi) {
                                     continue;
                                 } else if (type == JoinType.Anti) {
-                                    resultQueue.offer(lhs);
+                                    offerResult(lhs, projected, result);
                                     continue;
                                 }
                             }
@@ -182,18 +211,18 @@ public class HashJoinRegionScanner implements RegionScanner {
                             Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
                                     lhs : TupleProjector.mergeProjectedValue(
                                             (ProjectedValueTuple) lhs, schema, tempDestBitSet,
-                                            null, joinInfo.getSchemas()[i], tempSrcBitSet[i], 
+                                            null, joinInfo.getSchemas()[i], tempSrcBitSet[i],
                                             joinInfo.getFieldPositions()[i], useNewValueColumnQualifier);
-                            resultQueue.offer(joined);
+                            offerResult(joined, projected, result);
                             continue;
                         }
                         for (Tuple t : tempTuples[i]) {
                             Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
                                     lhs : TupleProjector.mergeProjectedValue(
                                             (ProjectedValueTuple) lhs, schema, tempDestBitSet,
-                                            t, joinInfo.getSchemas()[i], tempSrcBitSet[i], 
+                                            t, joinInfo.getSchemas()[i], tempSrcBitSet[i],
                                             joinInfo.getFieldPositions()[i], useNewValueColumnQualifier);
-                            resultQueue.offer(joined);
+                            offerResult(joined, projected, result);
                         }
                     }
                 }
@@ -265,7 +294,7 @@ public class HashJoinRegionScanner implements RegionScanner {
                 processResults(result, false);
                 result.clear();
             }
-            
+
             return nextInQueue(result);
         } catch (Throwable t) {
             ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);
@@ -309,5 +338,21 @@ public class HashJoinRegionScanner implements RegionScanner {
         return this.scanner.getBatch();
     }
 
-}
+    // PHOENIX-4791 Propagate array element cell through hash join
+    private void offerResult(Tuple tuple, boolean projected, List<Cell> result) {
+        if (!projected || !addArrayCell) {
+            resultQueue.offer(tuple);
+            return;
+        }
 
+        Cell projectedCell = tuple.getValue(0);
+        int arrayCellPosition = RegionScannerFactory.getArrayCellPosition(result);
+        Cell arrayCell = result.get(arrayCellPosition);
+
+        List<Cell> cells = new ArrayList<Cell>(2);
+        cells.add(projectedCell);
+        cells.add(arrayCell);
+        MultiKeyValueTuple multi = new MultiKeyValueTuple(cells);
+        resultQueue.offer(multi);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/58569220/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index cc7221e..1504a7c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -152,8 +152,9 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
 
     final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
     if (j != null) {
-      innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, env, useQualifierAsIndex,
-          useNewValueColumnQualifier);
+        innerScanner = new HashJoinRegionScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs,
+                                                 p, j, tenantId, useQualifierAsIndex,
+                                                 useNewValueColumnQualifier);
     }
     if (scanOffset != null) {
       innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/58569220/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index aed5805..b47d6b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -286,7 +286,7 @@ public abstract class RegionScannerFactory {
             QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,
             QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP,
             KeyValue.Type.codeToType(rowKv.getTypeByte()), value, 0, value.length));
-        return result.size() - 1;
+        return getArrayCellPosition(result);
       }
 
       @Override
@@ -300,4 +300,9 @@ public abstract class RegionScannerFactory {
       }
     };
   }
+
+    // PHOENIX-4791 Share position of array element cell
+    public static int getArrayCellPosition(List<Cell> result) {
+        return result.size() - 1;
+    }
 }