You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2018/09/06 17:33:56 UTC
phoenix git commit: PHOENIX-4791 Array elements are nullified with
joins
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.2 9390a91e9 -> 585692207
PHOENIX-4791 Array elements are nullified with joins
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/58569220
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/58569220
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/58569220
Branch: refs/heads/4.x-HBase-1.2
Commit: 58569220706b8c1d49ae649757abf8dca5818862
Parents: 9390a91
Author: Gerald Sangudi <gs...@23andme.com>
Authored: Wed Aug 22 16:59:12 2018 -0700
Committer: Thomas D'Silva <td...@apache.org>
Committed: Thu Sep 6 10:29:59 2018 -0700
----------------------------------------------------------------------
.../ProjectArrayElemAfterHashJoinIT.java | 177 +++++++++++++++++++
.../coprocessor/HashJoinRegionScanner.java | 69 ++++++--
.../NonAggregateRegionScannerFactory.java | 5 +-
.../phoenix/iterate/RegionScannerFactory.java | 7 +-
4 files changed, 243 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/58569220/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java
new file mode 100644
index 0000000..170eb69
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Test;
+
+public class ProjectArrayElemAfterHashJoinIT extends ParallelStatsDisabledIT {
+
+ @Test
+ public void testSalted() throws Exception {
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+
+ try {
+ String table = createSalted(conn);
+ testTable(conn, table);
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testUnsalted() throws Exception {
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+
+ try {
+ String table = createUnsalted(conn);
+ testTable(conn, table);
+ } finally {
+ conn.close();
+ }
+ }
+
+ private void testTable(Connection conn, String table) throws Exception {
+
+ verifyExplain(conn, table, false, false);
+ verifyExplain(conn, table, false, true);
+ verifyExplain(conn, table, true, false);
+ verifyExplain(conn, table, true, true);
+
+ verifyResults(conn, table, false, false);
+ verifyResults(conn, table, false, true);
+ verifyResults(conn, table, true, false);
+ verifyResults(conn, table, true, true);
+ }
+
+ private String createSalted(Connection conn) throws Exception {
+
+ String table = "SALTED_" + generateUniqueName();
+ String create = "CREATE TABLE " + table + " ("
+ + " id INTEGER NOT NULL,"
+ + " vals TINYINT[],"
+ + " CONSTRAINT pk PRIMARY KEY (id)"
+ + ") SALT_BUCKETS = 4";
+
+ conn.createStatement().execute(create);
+ return table;
+ }
+
+ private String createUnsalted(Connection conn) throws Exception {
+
+ String table = "UNSALTED_" + generateUniqueName();
+ String create = "CREATE TABLE " + table + " ("
+ + " id INTEGER NOT NULL,"
+ + " vals TINYINT[],"
+ + " CONSTRAINT pk PRIMARY KEY (id)"
+ + ")";
+
+ conn.createStatement().execute(create);
+ return table;
+ }
+
+ private String getQuery(String table, boolean fullArray, boolean hashJoin) {
+
+ String query = "SELECT id, vals[1] v1, vals[2] v2, vals[3] v3, vals[4] v4"
+ + (fullArray ? ", vals" : "")
+ + " FROM " + table
+ + " WHERE id IN "
+ + (hashJoin ? "(SELECT 1)" : "(1, 2, 3)")
+ ;
+
+ return query;
+ }
+
+ private void verifyExplain(Connection conn, String table, boolean fullArray, boolean hashJoin)
+ throws Exception {
+
+ String query = "EXPLAIN " + getQuery(table, fullArray, hashJoin);
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(query);
+
+ try {
+ String plan = QueryUtil.getExplainPlan(rs);
+ assertTrue(plan != null);
+ assertTrue(fullArray || plan.contains("SERVER ARRAY ELEMENT PROJECTION"));
+ assertTrue(hashJoin == plan.contains("JOIN"));
+ } finally {
+ rs.close();
+ }
+ }
+
+ private void verifyResults(Connection conn, String table, boolean fullArray, boolean hashJoin)
+ throws Exception {
+
+ String upsert = "UPSERT INTO " + table + "(id, vals)"
+ + " VALUES(1, ARRAY[10, 20, 30, 40, 50])";
+ PreparedStatement upsertStmt = conn.prepareStatement(upsert);
+ upsertStmt.execute();
+ conn.commit();
+
+ String query = getQuery(table, fullArray, hashJoin);
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(query);
+
+ try {
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt("id"));
+ assertEquals(10, rs.getInt("v1"));
+ assertEquals(20, rs.getInt("v2"));
+ assertEquals(30, rs.getInt("v3"));
+ assertEquals(40, rs.getInt("v4"));
+
+ if (fullArray) {
+ java.sql.Array array = rs.getArray("vals");
+ assertTrue(array != null);
+ Object obj = array.getArray();
+ assertTrue(obj != null);
+ assertTrue(obj.getClass().isArray());
+ assertEquals(5, java.lang.reflect.Array.getLength(obj));
+ }
+
+ assertFalse(rs.next());
+ } finally {
+ rs.close();
+ }
+ }
+
+ private void dropTable(Connection conn, String table) throws Exception {
+
+ String drop = "DROP TABLE " + table;
+ Statement stmt = conn.createStatement();
+ stmt.execute(drop);
+ stmt.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/58569220/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 59f844d..d82aaba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -18,10 +18,12 @@
package org.apache.phoenix.coprocessor;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -37,12 +39,15 @@ import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.iterate.RegionScannerFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -66,9 +71,27 @@ public class HashJoinRegionScanner implements RegionScanner {
private ValueBitSet[] tempSrcBitSet;
private final boolean useQualifierAsListIndex;
private final boolean useNewValueColumnQualifier;
-
+ private final boolean addArrayCell;
+
+ @SuppressWarnings("unchecked")
+ public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector,
+ HashJoinInfo joinInfo, ImmutableBytesPtr tenantId,
+ RegionCoprocessorEnvironment env, boolean useQualifierAsIndex,
+ boolean useNewValueColumnQualifier)
+ throws IOException {
+
+ this(env, scanner, null, null, projector, joinInfo,
+ tenantId, useQualifierAsIndex, useNewValueColumnQualifier);
+ }
+
@SuppressWarnings("unchecked")
- public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, RegionCoprocessorEnvironment env, boolean useQualifierAsIndex, boolean useNewValueColumnQualifier) throws IOException {
+ public HashJoinRegionScanner(RegionCoprocessorEnvironment env, RegionScanner scanner,
+ final Set<KeyValueColumnExpression> arrayKVRefs,
+ final Expression[] arrayFuncRefs, TupleProjector projector,
+ HashJoinInfo joinInfo, ImmutableBytesPtr tenantId,
+ boolean useQualifierAsIndex, boolean useNewValueColumnQualifier)
+ throws IOException {
+
this.env = env;
this.scanner = scanner;
this.projector = projector;
@@ -103,7 +126,7 @@ public class HashJoinRegionScanner implements RegionScanner {
Bytes.toLong(ByteUtil.copyKeyBytesIfNecessary(joinId)));
throw new DoNotRetryIOException(cause.getMessage(), cause);
}
-
+
hashCaches[i] = hashCache;
tempSrcBitSet[i] = ValueBitSet.newInstance(joinInfo.getSchemas()[i]);
}
@@ -113,16 +136,21 @@ public class HashJoinRegionScanner implements RegionScanner {
}
this.useQualifierAsListIndex = useQualifierAsIndex;
this.useNewValueColumnQualifier = useNewValueColumnQualifier;
+ this.addArrayCell = (arrayFuncRefs != null && arrayFuncRefs.length > 0 &&
+ arrayKVRefs != null && arrayKVRefs.size() > 0);
}
private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException {
if (result.isEmpty())
return;
Tuple tuple = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result));
+ boolean projected = false;
+
// For backward compatibility. In new versions, HashJoinInfo.forceProjection()
// always returns true.
if (joinInfo.forceProjection()) {
tuple = projector.projectResults(tuple, useNewValueColumnQualifier);
+ projected = true;
}
// TODO: fix below Scanner.next() and Scanner.nextRaw() methods as well.
@@ -150,14 +178,15 @@ public class HashJoinRegionScanner implements RegionScanner {
dup *= (tempTuples[i] == null ? 1 : tempTuples[i].size());
}
for (int i = 0; i < dup; i++) {
- resultQueue.offer(tuple);
+ offerResult(tuple, projected, result);
}
} else {
KeyValueSchema schema = joinInfo.getJoinedSchema();
if (!joinInfo.forceProjection()) { // backward compatibility
tuple = projector.projectResults(tuple, useNewValueColumnQualifier);
+ projected = true;
}
- resultQueue.offer(tuple);
+ offerResult(tuple, projected, result);
for (int i = 0; i < count; i++) {
boolean earlyEvaluation = joinInfo.earlyEvaluation()[i];
JoinType type = joinInfo.getJoinTypes()[i];
@@ -173,7 +202,7 @@ public class HashJoinRegionScanner implements RegionScanner {
if (type == JoinType.Inner || type == JoinType.Semi) {
continue;
} else if (type == JoinType.Anti) {
- resultQueue.offer(lhs);
+ offerResult(lhs, projected, result);
continue;
}
}
@@ -182,18 +211,18 @@ public class HashJoinRegionScanner implements RegionScanner {
Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
lhs : TupleProjector.mergeProjectedValue(
(ProjectedValueTuple) lhs, schema, tempDestBitSet,
- null, joinInfo.getSchemas()[i], tempSrcBitSet[i],
+ null, joinInfo.getSchemas()[i], tempSrcBitSet[i],
joinInfo.getFieldPositions()[i], useNewValueColumnQualifier);
- resultQueue.offer(joined);
+ offerResult(joined, projected, result);
continue;
}
for (Tuple t : tempTuples[i]) {
Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
lhs : TupleProjector.mergeProjectedValue(
(ProjectedValueTuple) lhs, schema, tempDestBitSet,
- t, joinInfo.getSchemas()[i], tempSrcBitSet[i],
+ t, joinInfo.getSchemas()[i], tempSrcBitSet[i],
joinInfo.getFieldPositions()[i], useNewValueColumnQualifier);
- resultQueue.offer(joined);
+ offerResult(joined, projected, result);
}
}
}
@@ -265,7 +294,7 @@ public class HashJoinRegionScanner implements RegionScanner {
processResults(result, false);
result.clear();
}
-
+
return nextInQueue(result);
} catch (Throwable t) {
ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);
@@ -309,5 +338,21 @@ public class HashJoinRegionScanner implements RegionScanner {
return this.scanner.getBatch();
}
-}
+ // PHOENIX-4791 Propagate array element cell through hash join
+ private void offerResult(Tuple tuple, boolean projected, List<Cell> result) {
+ if (!projected || !addArrayCell) {
+ resultQueue.offer(tuple);
+ return;
+ }
+ Cell projectedCell = tuple.getValue(0);
+ int arrayCellPosition = RegionScannerFactory.getArrayCellPosition(result);
+ Cell arrayCell = result.get(arrayCellPosition);
+
+ List<Cell> cells = new ArrayList<Cell>(2);
+ cells.add(projectedCell);
+ cells.add(arrayCell);
+ MultiKeyValueTuple multi = new MultiKeyValueTuple(cells);
+ resultQueue.offer(multi);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/58569220/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index cc7221e..1504a7c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -152,8 +152,9 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
if (j != null) {
- innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, env, useQualifierAsIndex,
- useNewValueColumnQualifier);
+ innerScanner = new HashJoinRegionScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs,
+ p, j, tenantId, useQualifierAsIndex,
+ useNewValueColumnQualifier);
}
if (scanOffset != null) {
innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator(
http://git-wip-us.apache.org/repos/asf/phoenix/blob/58569220/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index aed5805..b47d6b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -286,7 +286,7 @@ public abstract class RegionScannerFactory {
QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,
QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP,
KeyValue.Type.codeToType(rowKv.getTypeByte()), value, 0, value.length));
- return result.size() - 1;
+ return getArrayCellPosition(result);
}
@Override
@@ -300,4 +300,9 @@ public abstract class RegionScannerFactory {
}
};
}
+
+ // PHOENIX-4791 Share position of array element cell
+ public static int getArrayCellPosition(List<Cell> result) {
+ return result.size() - 1;
+ }
}