You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2021/03/19 16:35:17 UTC
[phoenix] 02/02: PHOENIX-6422 Remove CorrelatePlan
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit e4ea2c547d3093e7f6538e3cb2f7ea351ce4402a
Author: Istvan Toth <st...@apache.org>
AuthorDate: Fri Mar 19 09:42:32 2021 +0100
PHOENIX-6422 Remove CorrelatePlan
---
.../org/apache/phoenix/execute/CorrelatePlan.java | 265 ------------------
.../execute/visitor/AvgRowWidthVisitor.java | 6 -
.../phoenix/execute/visitor/ByteCountVisitor.java | 6 -
.../phoenix/execute/visitor/QueryPlanVisitor.java | 1 -
.../phoenix/execute/visitor/RowCountVisitor.java | 11 -
.../apache/phoenix/compile/QueryCompilerTest.java | 6 -
.../apache/phoenix/execute/CorrelatePlanTest.java | 299 ---------------------
7 files changed, 594 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
deleted file mode 100644
index 991ac5b..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.phoenix.compile.ExplainPlan;
-import org.apache.phoenix.compile.ExplainPlanAttributes;
-import org.apache.phoenix.compile.ExplainPlanAttributes
- .ExplainPlanAttributesBuilder;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
-import org.apache.phoenix.execute.visitor.ByteCountVisitor;
-import org.apache.phoenix.execute.visitor.RowCountVisitor;
-import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
-import org.apache.phoenix.iterate.ParallelScanGrouper;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.optimize.Cost;
-import org.apache.phoenix.parse.JoinTableNode.JoinType;
-import org.apache.phoenix.schema.KeyValueSchema;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.ValueBitSet;
-import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.SchemaUtil;
-
-import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
-
-public class CorrelatePlan extends DelegateQueryPlan {
- private final QueryPlan rhs;
- private final String variableId;
- private final JoinType joinType;
- private final boolean isSingleValueOnly;
- private final RuntimeContext runtimeContext;
- private final KeyValueSchema joinedSchema;
- private final KeyValueSchema lhsSchema;
- private final KeyValueSchema rhsSchema;
- private final int rhsFieldPosition;
-
-
- public CorrelatePlan(QueryPlan lhs, QueryPlan rhs, String variableId,
- JoinType joinType, boolean isSingleValueOnly,
- RuntimeContext runtimeContext, PTable joinedTable,
- PTable lhsTable, PTable rhsTable, int rhsFieldPosition) {
- super(lhs);
- if (joinType != JoinType.Inner && joinType != JoinType.Left && joinType != JoinType.Semi && joinType != JoinType.Anti)
- throw new IllegalArgumentException("Unsupported join type '" + joinType + "' by CorrelatePlan");
-
- this.rhs = rhs;
- this.variableId = variableId;
- this.joinType = joinType;
- this.isSingleValueOnly = isSingleValueOnly;
- this.runtimeContext = runtimeContext;
- this.joinedSchema = buildSchema(joinedTable);
- this.lhsSchema = buildSchema(lhsTable);
- this.rhsSchema = buildSchema(rhsTable);
- this.rhsFieldPosition = rhsFieldPosition;
- }
-
- private static KeyValueSchema buildSchema(PTable table) {
- KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
- if (table != null) {
- for (PColumn column : table.getColumns()) {
- if (!SchemaUtil.isPKColumn(column)) {
- builder.addField(column);
- }
- }
- }
- return builder.build();
- }
-
- @Override
- public ExplainPlan getExplainPlan() throws SQLException {
- List<String> steps = Lists.newArrayList();
- steps.add("NESTED-LOOP-JOIN (" + joinType.toString().toUpperCase() + ") TABLES");
- ExplainPlan lhsExplainPlan = delegate.getExplainPlan();
- List<String> lhsPlanSteps = lhsExplainPlan.getPlanSteps();
- ExplainPlanAttributes lhsPlanAttributes =
- lhsExplainPlan.getPlanStepsAsAttributes();
- ExplainPlanAttributesBuilder lhsPlanBuilder =
- new ExplainPlanAttributesBuilder(lhsPlanAttributes);
- lhsPlanBuilder.setAbstractExplainPlan("NESTED-LOOP-JOIN ("
- + joinType.toString().toUpperCase() + ")");
-
- for (String step : lhsPlanSteps) {
- steps.add(" " + step);
- }
- steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : ""));
-
- ExplainPlan rhsExplainPlan = rhs.getExplainPlan();
- List<String> rhsPlanSteps = rhsExplainPlan.getPlanSteps();
- ExplainPlanAttributes rhsPlanAttributes =
- rhsExplainPlan.getPlanStepsAsAttributes();
- ExplainPlanAttributesBuilder rhsPlanBuilder =
- new ExplainPlanAttributesBuilder(rhsPlanAttributes);
-
- lhsPlanBuilder.setRhsJoinQueryExplainPlan(rhsPlanBuilder.build());
-
- for (String step : rhsPlanSteps) {
- steps.add(" " + step);
- }
- return new ExplainPlan(steps, lhsPlanBuilder.build());
- }
-
- @Override
- public ResultIterator iterator(final ParallelScanGrouper scanGrouper, final Scan scan)
- throws SQLException {
- return new CorrelateResultIterator(scanGrouper, scan) ;
- }
-
- @Override
- public Integer getLimit() {
- return null;
- }
-
- @Override
- public <T> T accept(QueryPlanVisitor<T> visitor) {
- return visitor.visit(this);
- }
-
- public QueryPlan getRhsPlan() {
- return rhs;
- }
-
- @Override
- public Cost getCost() {
- Double lhsByteCount = delegate.accept(new ByteCountVisitor());
- Double rhsRowCount = rhs.accept(new RowCountVisitor());
-
- if (lhsByteCount == null || rhsRowCount == null) {
- return Cost.UNKNOWN;
- }
-
- Cost cost = new Cost(0, 0, lhsByteCount * rhsRowCount);
- Cost lhsCost = delegate.getCost();
- return cost.plus(lhsCost).plus(rhs.getCost());
- }
-
- private class CorrelateResultIterator implements ResultIterator {
- private final ValueBitSet destBitSet = ValueBitSet.newInstance(joinedSchema);
- private final ValueBitSet lhsBitSet = ValueBitSet.newInstance(lhsSchema);
- private final ValueBitSet rhsBitSet =
- (joinType == JoinType.Semi || joinType == JoinType.Anti) ?
- ValueBitSet.EMPTY_VALUE_BITSET
- : ValueBitSet.newInstance(rhsSchema);
- private final ResultIterator iter;
- private ResultIterator rhsIter = null;
- private Tuple current = null;
- private boolean closed = false;
-
- private CorrelateResultIterator(ParallelScanGrouper scanGrouper, Scan scan)
- throws SQLException {
- iter = delegate.iterator(scanGrouper, scan);
- }
-
- @Override
- public void close() throws SQLException {
- if (!closed) {
- closed = true;
- iter.close();
- if (rhsIter != null) {
- rhsIter.close();
- }
- }
- }
-
- @Override
- public Tuple next() throws SQLException {
- if (closed)
- return null;
-
- Tuple rhsCurrent = null;
- if (rhsIter != null) {
- rhsCurrent = rhsIter.next();
- if (rhsCurrent == null) {
- rhsIter.close();
- rhsIter = null;
- } else if (isSingleValueOnly) {
- throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS)
- .build().buildException();
- }
- }
- while (rhsIter == null) {
- current = iter.next();
- if (current == null) {
- close();
- return null;
- }
- runtimeContext.setCorrelateVariableValue(variableId, current);
- rhsIter = rhs.iterator();
- rhsCurrent = rhsIter.next();
- if ((rhsCurrent == null && (joinType == JoinType.Inner || joinType == JoinType.Semi))
- || (rhsCurrent != null && joinType == JoinType.Anti)) {
- rhsIter.close();
- rhsIter = null;
- }
- }
-
- Tuple joined;
- try {
- joined = rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ?
- current : TupleProjector.mergeProjectedValue(
- convertLhs(current), destBitSet,
- rhsCurrent, rhsBitSet, rhsFieldPosition, true);
- } catch (IOException e) {
- throw new SQLException(e);
- }
-
- if ((joinType == JoinType.Semi || rhsCurrent == null) && rhsIter != null) {
- rhsIter.close();
- rhsIter = null;
- }
-
- return joined;
- }
-
- @Override
- public void explain(List<String> planSteps) { }
-
- @Override
- public void explain(List<String> planSteps,
- ExplainPlanAttributesBuilder explainPlanAttributesBuilder) {
- }
-
- private ProjectedValueTuple convertLhs(Tuple lhs) throws IOException {
- ProjectedValueTuple tuple;
- if (lhs instanceof ProjectedValueTuple) {
- tuple = (ProjectedValueTuple) lhs;
- } else {
- ImmutableBytesWritable ptr = getContext().getTempPtr();
- TupleProjector.decodeProjectedValue(lhs, ptr);
- lhsBitSet.clear();
- lhsBitSet.or(ptr);
- int bitSetLen = lhsBitSet.getEstimatedLength();
- tuple = new ProjectedValueTuple(lhs, lhs.getValue(0).getTimestamp(),
- ptr.get(), ptr.getOffset(), ptr.getLength(), bitSetLen);
-
- }
- return tuple;
- }
- }
-
-}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
index 9525747..ada8664 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
@@ -23,7 +23,6 @@ import org.apache.phoenix.compile.TraceQueryPlan;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.ClientAggregatePlan;
import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.CorrelatePlan;
import org.apache.phoenix.execute.CursorFetchPlan;
import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.LiteralResultIterationPlan;
@@ -156,11 +155,6 @@ public class AvgRowWidthVisitor implements QueryPlanVisitor<Double> {
}
@Override
- public Double visit(CorrelatePlan plan) {
- return plan.getDelegate().accept(this);
- }
-
- @Override
public Double visit(CursorFetchPlan plan) {
return plan.getDelegate().accept(this);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
index 61a2895..7e9e88b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
@@ -23,7 +23,6 @@ import org.apache.phoenix.compile.TraceQueryPlan;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.ClientAggregatePlan;
import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.CorrelatePlan;
import org.apache.phoenix.execute.CursorFetchPlan;
import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.LiteralResultIterationPlan;
@@ -94,11 +93,6 @@ public class ByteCountVisitor implements QueryPlanVisitor<Double> {
}
@Override
- public Double visit(CorrelatePlan plan) {
- return getByteCountFromRowCountAndRowWidth(plan);
- }
-
- @Override
public Double visit(CursorFetchPlan plan) {
return getByteCountFromRowCountAndRowWidth(plan);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
index a7ae3af..9229f9f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
@@ -39,7 +39,6 @@ public interface QueryPlanVisitor<E> {
E visit(SortMergeJoinPlan plan);
E visit(UnionPlan plan);
E visit(UnnestArrayPlan plan);
- E visit(CorrelatePlan plan);
E visit(CursorFetchPlan plan);
E visit(ListJarsQueryPlan plan);
E visit(TraceQueryPlan plan);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
index 58ceea9..81c39a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
@@ -26,7 +26,6 @@ import org.apache.phoenix.compile.TraceQueryPlan;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.ClientAggregatePlan;
import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.CorrelatePlan;
import org.apache.phoenix.execute.CursorFetchPlan;
import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.LiteralResultIterationPlan;
@@ -206,16 +205,6 @@ public class RowCountVisitor implements QueryPlanVisitor<Double> {
}
@Override
- public Double visit(CorrelatePlan plan) {
- Double lhsRows = plan.getDelegate().accept(this);
- if (lhsRows != null) {
- return lhsRows * SEMI_OR_ANTI_JOIN_FACTOR;
- }
-
- return null;
- }
-
- @Override
public Double visit(CursorFetchPlan plan) {
return plan.getDelegate().accept(this);
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 16f9489..17c369d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -59,7 +59,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.ClientAggregatePlan;
import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.CorrelatePlan;
import org.apache.phoenix.execute.CursorFetchPlan;
import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan;
@@ -5240,11 +5239,6 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
}
@Override
- public List<QueryPlan> visit(CorrelatePlan plan) {
- return Collections.emptyList();
- }
-
- @Override
public List<QueryPlan> visit(CursorFetchPlan plan) {
return Collections.emptyList();
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
deleted file mode 100644
index 0531cf8..0000000
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
-import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
-import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.phoenix.compile.ColumnResolver;
-import org.apache.phoenix.compile.FromCompiler;
-import org.apache.phoenix.compile.JoinCompiler;
-import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.compile.TupleProjectionCompiler;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.expression.ComparisonExpression;
-import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.expression.ProjectedColumnExpression;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.JoinTableNode.JoinType;
-import org.apache.phoenix.parse.ParseNodeFactory;
-import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.schema.ColumnRef;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PColumnImpl;
-import org.apache.phoenix.schema.PName;
-import org.apache.phoenix.schema.PNameFactory;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.EncodedCQCounter;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
-import org.apache.phoenix.schema.PTableImpl;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.junit.Test;
-
-import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
-
-public class CorrelatePlanTest {
-
- private static final StatementContext CONTEXT;
- static {
- try {
- PhoenixConnection connection = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS).unwrap(PhoenixConnection.class);
- PhoenixStatement stmt = new PhoenixStatement(connection);
- ColumnResolver resolver = FromCompiler.getResolverForQuery(SelectStatement.SELECT_ONE, connection);
- CONTEXT = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static final Object[][] LEFT_RELATION = new Object[][] {
- {1, "1"},
- {2, "2"},
- {3, "3"},
- {4, "4"},
- {5, "5"},
- };
-
- private static final Object[][] RIGHT_RELATION = new Object[][] {
- {"2", 20},
- {"2", 40},
- {"5", 50},
- {"6", 60},
- {"5", 100},
- {"1", 10},
- {"3", 30},
- };
-
- @Test
- public void testCorrelatePlanWithInnerJoinType() throws SQLException {
- Object[][] expected = new Object[][] {
- {1, "1", "1", 10},
- {2, "2", "2", 20},
- {2, "2", "2", 40},
- {3, "3", "3", 30},
- {5, "5", "5", 50},
- {5, "5", "5", 100},
- };
- testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Inner, expected);
- }
-
- @Test
- public void testCorrelatePlanWithLeftJoinType() throws SQLException {
- Object[][] expected = new Object[][] {
- {1, "1", "1", 10},
- {2, "2", "2", 20},
- {2, "2", "2", 40},
- {3, "3", "3", 30},
- {4, "4", null, null},
- {5, "5", "5", 50},
- {5, "5", "5", 100},
- };
- testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Left, expected);
- }
-
- @Test
- public void testCorrelatePlanWithSemiJoinType() throws SQLException {
- Object[][] expected = new Object[][] {
- {1, "1"},
- {2, "2"},
- {3, "3"},
- {5, "5"},
- };
- testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Semi, expected);
- }
-
- @Test
- public void testCorrelatePlanWithAntiJoinType() throws SQLException {
- Object[][] expected = new Object[][] {
- {4, "4"},
- };
- testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Anti, expected);
- }
-
- @Test
- public void testCorrelatePlanWithSingleValueOnly() throws SQLException {
- Object[][] expected = new Object[][] {
- {1, "1", "1", 10},
- {2, "2", "2", 20},
- {2, "2", "2", 40},
- };
- try {
- testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Inner, expected);
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS.getErrorCode(), e.getErrorCode());
- }
-
- Object[][] rightRelation = new Object[][] {
- {"2", 20},
- {"6", 60},
- {"5", 100},
- {"1", 10},
- };
- expected = new Object[][] {
- {1, "1", "1", 10},
- {2, "2", "2", 20},
- {5, "5", "5", 100},
- };
- testCorrelatePlan(LEFT_RELATION, rightRelation, 1, 0, JoinType.Inner, expected);
- }
-
- @Test
- public void testCorrelatePlanWithSingleValueOnlyAndOffset() throws SQLException {
- Integer offset = 1;
- Object[][] rightRelation = new Object[][] {
- {"6", 60},
- {"2", 20},
- {"5", 100},
- {"1", 10},
- };
- Object[][] expected = new Object[][] {
- {2, "2", "2", 20},
- {5, "5", "5", 100},
- };
- testCorrelatePlan(LEFT_RELATION, rightRelation, 1, 0, JoinType.Inner, expected, offset);
- }
-
- private void testCorrelatePlan(Object[][] leftRelation, Object[][] rightRelation, int leftCorrelColumn, int rightCorrelColumn, JoinType type, Object[][] expectedResult) throws SQLException {
- testCorrelatePlan(leftRelation, rightRelation, leftCorrelColumn, rightCorrelColumn, type, expectedResult, null);
- }
-
- private void testCorrelatePlan(Object[][] leftRelation, Object[][] rightRelation, int leftCorrelColumn,
- int rightCorrelColumn, JoinType type, Object[][] expectedResult, Integer offset) throws SQLException {
- TableRef leftTable = createProjectedTableFromLiterals(leftRelation[0]);
- TableRef rightTable = createProjectedTableFromLiterals(rightRelation[0]);
- String varName = "$cor0";
- RuntimeContext runtimeContext = new RuntimeContextImpl();
- runtimeContext.defineCorrelateVariable(varName, leftTable);
- QueryPlan leftPlan = newLiteralResultIterationPlan(leftRelation, offset);
- QueryPlan rightPlan = newLiteralResultIterationPlan(rightRelation, offset);
- Expression columnExpr = new ColumnRef(rightTable, rightCorrelColumn).newColumnExpression();
- Expression fieldAccess = new CorrelateVariableFieldAccessExpression(runtimeContext, varName, new ColumnRef(leftTable, leftCorrelColumn).newColumnExpression());
- Expression filter = ComparisonExpression.create(CompareOp.EQUAL, Arrays.asList(columnExpr, fieldAccess), CONTEXT.getTempPtr(), false);
- rightPlan = new ClientScanPlan(CONTEXT, SelectStatement.SELECT_ONE, rightTable, RowProjector.EMPTY_PROJECTOR,
- null, null, filter, OrderBy.EMPTY_ORDER_BY, rightPlan);
- PTable joinedTable = JoinCompiler.joinProjectedTables(leftTable.getTable(), rightTable.getTable(), type);
- CorrelatePlan correlatePlan = new CorrelatePlan(leftPlan, rightPlan, varName, type, false, runtimeContext, joinedTable, leftTable.getTable(), rightTable.getTable(), leftTable.getTable().getColumns().size());
- ResultIterator iter = correlatePlan.iterator();
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- for (Object[] row : expectedResult) {
- Tuple next = iter.next();
- assertNotNull(next);
- for (int i = 0; i < row.length; i++) {
- PColumn column = joinedTable.getColumns().get(i);
- boolean eval = new ProjectedColumnExpression(column, joinedTable, column.getName().getString()).evaluate(next, ptr);
- Object o = eval ? column.getDataType().toObject(ptr) : null;
- assertEquals(row[i], o);
- }
- }
- }
-
- private QueryPlan newLiteralResultIterationPlan(Object[][] rows, Integer offset) throws SQLException {
- List<Tuple> tuples = Lists.newArrayList();
- Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY);
- for (Object[] row : rows) {
- Expression[] exprs = new Expression[row.length];
- for (int i = 0; i < row.length; i++) {
- exprs[i] = LiteralExpression.newConstant(row[i]);
- }
- TupleProjector projector = new TupleProjector(exprs);
- tuples.add(projector.projectResults(baseTuple));
- }
-
- return new LiteralResultIterationPlan(tuples, CONTEXT, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF,
- RowProjector.EMPTY_PROJECTOR, null, offset, OrderBy.EMPTY_ORDER_BY, null);
- }
-
-
- private TableRef createProjectedTableFromLiterals(Object[] row) {
- List<PColumn> columns = Lists.<PColumn>newArrayList();
- for (int i = 0; i < row.length; i++) {
- String name = ParseNodeFactory.createTempAlias();
- Expression expr = LiteralExpression.newConstant(row[i]);
- PName colName = PNameFactory.newName(name);
- columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(VALUE_COLUMN_FAMILY),
- expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(),
- i, expr.getSortOrder(), null, null, false, name, false, false, colName.getBytes(), HConstants.LATEST_TIMESTAMP));
- }
- try {
- PTable pTable = new PTableImpl.Builder()
- .setType(PTableType.SUBQUERY)
- .setTimeStamp(MetaDataProtocol.MIN_TABLE_TIMESTAMP)
- .setIndexDisableTimestamp(0L)
- .setSequenceNumber(PTable.INITIAL_SEQ_NUM)
- .setImmutableRows(false)
- .setDisableWAL(false)
- .setMultiTenant(false)
- .setStoreNulls(false)
- .setUpdateCacheFrequency(0)
- .setNamespaceMapped(Boolean.FALSE)
- .setAppendOnlySchema(false)
- .setImmutableStorageScheme(ImmutableStorageScheme.ONE_CELL_PER_COLUMN)
- .setQualifierEncodingScheme(QualifierEncodingScheme.NON_ENCODED_QUALIFIERS)
- .setBaseColumnCount(BASE_TABLE_BASE_COLUMN_COUNT)
- .setEncodedCQCounter(EncodedCQCounter.NULL_COUNTER)
- .setUseStatsForParallelization(true)
- .setExcludedColumns(ImmutableList.of())
- .setSchemaName(PName.EMPTY_NAME)
- .setTableName(PName.EMPTY_NAME)
- .setRowKeyOrderOptimizable(true)
- .setIndexes(Collections.emptyList())
- .setPhysicalNames(ImmutableList.of())
- .setColumns(columns)
- .build();
- TableRef sourceTable = new TableRef(pTable);
- List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
- for (PColumn column : sourceTable.getTable().getColumns()) {
- sourceColumnRefs.add(new ColumnRef(sourceTable, column.getPosition()));
- }
-
- return new TableRef(TupleProjectionCompiler.createProjectedTable(sourceTable, sourceColumnRefs, false));
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-}