You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/10/13 17:05:28 UTC
git commit: PHOENIX-1332 Support correlated subqueries in comparison
with ANY/SOME/ALL
Repository: phoenix
Updated Branches:
refs/heads/master 656acefd1 -> 49ec34be2
PHOENIX-1332 Support correlated subqueries in comparison with ANY/SOME/ALL
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/49ec34be
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/49ec34be
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/49ec34be
Branch: refs/heads/master
Commit: 49ec34be258ce12ca150c5c37a35e2c1cad0105c
Parents: 656acef
Author: maryannxue <ma...@apache.org>
Authored: Mon Oct 13 11:05:00 2014 -0400
Committer: maryannxue <ma...@apache.org>
Committed: Mon Oct 13 11:05:00 2014 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/SubqueryIT.java | 79 ++++++++
.../org/apache/phoenix/cache/HashCache.java | 4 +-
.../apache/phoenix/compile/JoinCompiler.java | 15 +-
.../apache/phoenix/compile/QueryCompiler.java | 4 +-
.../phoenix/compile/SubqueryRewriter.java | 181 +++++++++++++++----
.../coprocessor/HashJoinRegionScanner.java | 1 +
.../apache/phoenix/execute/HashJoinPlan.java | 5 +-
.../expression/ArrayConstructorExpression.java | 16 +-
.../phoenix/expression/ExpressionType.java | 2 +
.../DistinctValueClientAggregator.java | 63 +++++++
.../DistinctValueWithCountServerAggregator.java | 2 +-
.../DistinctValueAggregateFunction.java | 66 +++++++
.../apache/phoenix/join/HashCacheClient.java | 7 +-
.../apache/phoenix/join/HashCacheFactory.java | 15 +-
.../org/apache/phoenix/parse/JoinTableNode.java | 8 +-
.../apache/phoenix/parse/ParseNodeFactory.java | 6 +-
.../apache/phoenix/parse/ParseNodeRewriter.java | 2 +-
17 files changed, 417 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
index 58d92f3..e4b4c8b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
@@ -899,6 +899,85 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
String plan = QueryUtil.getExplainPlan(rs);
assertTrue("\"" + plan + "\" does not match \"" + plans[4] + "\"", Pattern.matches(plans[4], plan));
+
+ query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004')";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+
+ assertFalse(rs.next());
+
+ query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003')";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ try {
+ while(rs.next());
+ fail("Should have got exception.");
+ } catch (SQLException e) {
+ }
+
+ query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004' GROUP BY \"order_id\")";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+
+ assertFalse(rs.next());
+
+ query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003' GROUP BY \"order_id\")";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ try {
+ while(rs.next());
+ fail("Should have got exception.");
+ } catch (SQLException e) {
+ }
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testAnyAllComparisonSubquery() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ String query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity = ALL(SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")";
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+
+ assertFalse(rs.next());
+
+ query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity != ALL(SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+
+ assertFalse(rs.next());
} finally {
conn.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
index e604f63..311f119 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
@@ -18,10 +18,10 @@
package org.apache.phoenix.cache;
import java.io.Closeable;
+import java.io.IOException;
import java.util.List;
import org.apache.http.annotation.Immutable;
-
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -34,5 +34,5 @@ import org.apache.phoenix.schema.tuple.Tuple;
*/
@Immutable
public interface HashCache extends Closeable {
- public List<Tuple> get(ImmutableBytesPtr hashKey);
+ public List<Tuple> get(ImmutableBytesPtr hashKey) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index d473c97..7e5382e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -187,7 +187,7 @@ public class JoinCompiler {
if (joinSpecs == null) {
joinSpecs = new ArrayList<JoinSpec>();
}
- joinSpecs.add(new JoinSpec(joinNode.getType(), joinNode.getOnNode(), joinTable, origResolver));
+ joinSpecs.add(new JoinSpec(joinNode.getType(), joinNode.getOnNode(), joinTable, joinNode.isSingleValueOnly(), origResolver));
return new Pair<Table, List<JoinSpec>>(lhs.getFirst(), joinSpecs);
}
@@ -362,7 +362,8 @@ public class JoinCompiler {
&& count > 1
&& joinSpecs.get(count - 1).getType() != JoinType.Left
&& joinSpecs.get(count - 1).getType() != JoinType.Semi
- && joinSpecs.get(count - 1).getType() != JoinType.Anti))
+ && joinSpecs.get(count - 1).getType() != JoinType.Anti
+ && !joinSpecs.get(count - 1).isSingleValueOnly()))
return null;
boolean[] vector = new boolean[count];
@@ -437,13 +438,15 @@ public class JoinCompiler {
private final JoinType type;
private final List<ComparisonParseNode> onConditions;
private final JoinTable joinTable;
+ private final boolean singleValueOnly;
private Set<TableRef> dependencies;
private JoinSpec(JoinType type, ParseNode onNode, JoinTable joinTable,
- ColumnResolver resolver) throws SQLException {
+ boolean singleValueOnly, ColumnResolver resolver) throws SQLException {
this.type = type;
this.onConditions = new ArrayList<ComparisonParseNode>();
this.joinTable = joinTable;
+ this.singleValueOnly = singleValueOnly;
this.dependencies = new HashSet<TableRef>();
OnNodeVisitor visitor = new OnNodeVisitor(resolver, onConditions, dependencies, joinTable);
onNode.accept(visitor);
@@ -461,6 +464,10 @@ public class JoinCompiler {
return joinTable;
}
+ public boolean isSingleValueOnly() {
+ return singleValueOnly;
+ }
+
public Set<TableRef> getDependencies() {
return dependencies;
}
@@ -1177,7 +1184,7 @@ public class JoinCompiler {
if (lhs == lhsReplace && rhs == rhsReplace)
return joinNode;
- return NODE_FACTORY.join(joinNode.getType(), lhsReplace, rhsReplace, joinNode.getOnNode());
+ return NODE_FACTORY.join(joinNode.getType(), lhsReplace, rhsReplace, joinNode.getOnNode(), joinNode.isSingleValueOnly());
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 52abb9e..a2dc5b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -225,7 +225,7 @@ public class QueryCompiler {
if (i < count - 1) {
fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
}
- subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, keyRangeLhsExpression, keyRangeRhsExpression, clientProjector, hasFilters);
+ subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression, clientProjector, hasFilters);
}
if (needsProject) {
TupleProjector.serializeProjectorIntoScan(context.getScan(), initialProjectedTable.createTupleProjector());
@@ -296,7 +296,7 @@ public class QueryCompiler {
HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
getKeyExpressionCombinations(keyRangeExpressions, context, rhsTableRef, type, joinExpressions, hashExpressions);
- return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), clientProjector, lhsJoin.hasFilters())});
+ return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), clientProjector, lhsJoin.hasFilters())});
}
// Do not support queries like "A right join B left join C" with hash-joins.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
index 6428155..4b37259 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
@@ -25,9 +25,12 @@ import java.util.List;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.expression.function.DistinctValueAggregateFunction;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.AndParseNode;
+import org.apache.phoenix.parse.ArrayAllComparisonNode;
+import org.apache.phoenix.parse.ArrayAnyComparisonNode;
import org.apache.phoenix.parse.BooleanParseNodeVisitor;
import org.apache.phoenix.parse.ColumnParseNode;
import org.apache.phoenix.parse.ComparisonParseNode;
@@ -36,6 +39,7 @@ import org.apache.phoenix.parse.ExistsParseNode;
import org.apache.phoenix.parse.InParseNode;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.parse.LiteralParseNode;
+import org.apache.phoenix.parse.OrderByNode;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.ParseNodeRewriter;
@@ -129,26 +133,32 @@ public class SubqueryRewriter extends ParseNodeRewriter {
@Override
public ParseNode visitLeave(InParseNode node, List<ParseNode> l) throws SQLException {
+ boolean isTopNode = topNode == node;
+ if (isTopNode) {
+ topNode = null;
+ }
+
SubqueryParseNode subqueryNode = (SubqueryParseNode) l.get(1);
SelectStatement subquery = subqueryNode.getSelectNode();
String rhsTableAlias = ParseNodeFactory.createTempAlias();
- List<AliasedNode> selectNodes = fixAliasedNodes(subquery.getSelect());
+ List<AliasedNode> selectNodes = fixAliasedNodes(subquery.getSelect(), true);
subquery = NODE_FACTORY.select(subquery, true, selectNodes);
ParseNode onNode = getJoinConditionNode(l.get(0), selectNodes, rhsTableAlias);
TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery);
- JoinType joinType = topNode == node ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left;
- ParseNode ret = topNode == node ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate());
- tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode);
-
- if (topNode == node) {
- topNode = null;
- }
+ JoinType joinType = isTopNode ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left;
+ ParseNode ret = isTopNode ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate());
+ tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, false);
return ret;
}
@Override
public ParseNode visitLeave(ExistsParseNode node, List<ParseNode> l) throws SQLException {
+ boolean isTopNode = topNode == node;
+ if (isTopNode) {
+ topNode = null;
+ }
+
SubqueryParseNode subqueryNode = (SubqueryParseNode) l.get(0);
SelectStatement subquery = subqueryNode.getSelectNode();
String rhsTableAlias = ParseNodeFactory.createTempAlias();
@@ -169,19 +179,20 @@ public class SubqueryRewriter extends ParseNodeRewriter {
subquery = NODE_FACTORY.select(subquery, true, selectNodes, where);
ParseNode onNode = conditionExtractor.getJoinCondition();
TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery);
- JoinType joinType = topNode == node ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left;
- ParseNode ret = topNode == node ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate());
- tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode);
-
- if (topNode == node) {
- topNode = null;
- }
+ JoinType joinType = isTopNode ? (node.isNegate() ? JoinType.Anti : JoinType.Semi) : JoinType.Left;
+ ParseNode ret = isTopNode ? null : NODE_FACTORY.isNull(NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null), !node.isNegate());
+ tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, false);
return ret;
}
@Override
public ParseNode visitLeave(ComparisonParseNode node, List<ParseNode> l) throws SQLException {
+ boolean isTopNode = topNode == node;
+ if (isTopNode) {
+ topNode = null;
+ }
+
ParseNode secondChild = l.get(1);
if (!(secondChild instanceof SubqueryParseNode)) {
return super.visitLeave(node, l);
@@ -200,12 +211,9 @@ public class SubqueryRewriter extends ParseNodeRewriter {
return super.visitLeave(node, l);
}
- if (!subquery.isAggregate() || !subquery.getGroupBy().isEmpty()) {
- //TODO add runtime singleton check or add a "singleton" aggregate funtion
- throw new SQLFeatureNotSupportedException("Do not support non-aggregate or groupby subquery in comparison.");
- }
-
ParseNode rhsNode = null;
+ boolean isGroupby = !subquery.getGroupBy().isEmpty();
+ boolean isAggregate = subquery.isAggregate();
List<AliasedNode> aliasedNodes = subquery.getSelect();
if (aliasedNodes.size() == 1) {
rhsNode = aliasedNodes.get(0).getNode();
@@ -221,28 +229,139 @@ public class SubqueryRewriter extends ParseNodeRewriter {
List<AliasedNode> selectNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size() + 1);
selectNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), rhsNode));
selectNodes.addAll(additionalSelectNodes);
- List<ParseNode> groupbyNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size());
- for (AliasedNode aliasedNode : additionalSelectNodes) {
- groupbyNodes.add(aliasedNode.getNode());
+
+ if (!isAggregate) {
+ subquery = NODE_FACTORY.select(subquery, subquery.isDistinct(), selectNodes, where);
+ } else {
+ List<ParseNode> groupbyNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size() + subquery.getGroupBy().size());
+ for (AliasedNode aliasedNode : additionalSelectNodes) {
+ groupbyNodes.add(aliasedNode.getNode());
+ }
+ groupbyNodes.addAll(subquery.getGroupBy());
+ subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true);
}
- subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true);
ParseNode onNode = conditionExtractor.getJoinCondition();
TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery);
- JoinType joinType = topNode == node ? JoinType.Inner : JoinType.Left;
+ JoinType joinType = isTopNode ? JoinType.Inner : JoinType.Left;
ParseNode ret = NODE_FACTORY.comparison(node.getFilterOp(), l.get(0), NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null));
- tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode);
+ tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, !isAggregate || isGroupby);
- if (topNode == node) {
+ return ret;
+ }
+
+ @Override
+ public ParseNode visitLeave(ArrayAnyComparisonNode node, List<ParseNode> l) throws SQLException {
+ List<ParseNode> children = leaveArrayComparisonNode(node, l);
+ if (children == l)
+ return super.visitLeave(node, l);
+
+ node = NODE_FACTORY.arrayAny(children.get(0), (ComparisonParseNode) children.get(1));
+ return node;
+ }
+
+ @Override
+ public ParseNode visitLeave(ArrayAllComparisonNode node, List<ParseNode> l) throws SQLException {
+ List<ParseNode> children = leaveArrayComparisonNode(node, l);
+ if (children == l)
+ return super.visitLeave(node, l);
+
+ node = NODE_FACTORY.arrayAll(children.get(0), (ComparisonParseNode) children.get(1));
+ return node;
+ }
+
+ protected List<ParseNode> leaveArrayComparisonNode(ParseNode node, List<ParseNode> l) throws SQLException {
+ boolean isTopNode = topNode == node;
+ if (isTopNode) {
topNode = null;
}
+
+ ParseNode firstChild = l.get(0);
+ if (!(firstChild instanceof SubqueryParseNode)) {
+ return l;
+ }
- return ret;
+ SubqueryParseNode subqueryNode = (SubqueryParseNode) firstChild;
+ SelectStatement subquery = subqueryNode.getSelectNode();
+ String rhsTableAlias = ParseNodeFactory.createTempAlias();
+ JoinConditionExtractor conditionExtractor = new JoinConditionExtractor(subquery, resolver, connection, rhsTableAlias);
+ ParseNode where = subquery.getWhere() == null ? null : subquery.getWhere().accept(conditionExtractor);
+ if (where == subquery.getWhere()) { // non-correlated any/all comparison subquery
+ return l;
+ }
+
+ ParseNode rhsNode = null;
+ boolean isNonGroupByAggregate = subquery.getGroupBy().isEmpty() && subquery.isAggregate();
+ List<AliasedNode> aliasedNodes = subquery.getSelect();
+ String derivedTableAlias = null;
+ if (!subquery.getGroupBy().isEmpty()) {
+ derivedTableAlias = ParseNodeFactory.createTempAlias();
+ aliasedNodes = fixAliasedNodes(aliasedNodes, false);
+ }
+
+ if (aliasedNodes.size() == 1) {
+ rhsNode = derivedTableAlias == null ? aliasedNodes.get(0).getNode() : NODE_FACTORY.column(NODE_FACTORY.table(null, derivedTableAlias), aliasedNodes.get(0).getAlias(), null);
+ } else {
+ List<ParseNode> nodes = Lists.<ParseNode> newArrayListWithExpectedSize(aliasedNodes.size());
+ for (AliasedNode aliasedNode : aliasedNodes) {
+ nodes.add(derivedTableAlias == null ? aliasedNode.getNode() : NODE_FACTORY.column(NODE_FACTORY.table(null, derivedTableAlias), aliasedNode.getAlias(), null));
+ }
+ rhsNode = NODE_FACTORY.rowValueConstructor(nodes);
+ }
+
+ if (!isNonGroupByAggregate) {
+ rhsNode = NODE_FACTORY.function(DistinctValueAggregateFunction.NAME, Collections.singletonList(rhsNode));
+ }
+
+ List<AliasedNode> additionalSelectNodes = conditionExtractor.getAdditionalSelectNodes();
+ List<AliasedNode> selectNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size() + 1);
+ selectNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), rhsNode));
+ selectNodes.addAll(additionalSelectNodes);
+ List<ParseNode> groupbyNodes = Lists.newArrayListWithExpectedSize(additionalSelectNodes.size());
+ for (AliasedNode aliasedNode : additionalSelectNodes) {
+ groupbyNodes.add(aliasedNode.getNode());
+ }
+
+ if (derivedTableAlias == null) {
+ subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true);
+ } else {
+ List<ParseNode> derivedTableGroupBy = Lists.newArrayListWithExpectedSize(subquery.getGroupBy().size() + groupbyNodes.size());
+ derivedTableGroupBy.addAll(subquery.getGroupBy());
+ derivedTableGroupBy.addAll(groupbyNodes);
+ List<AliasedNode> derivedTableSelect = Lists.newArrayListWithExpectedSize(aliasedNodes.size() + selectNodes.size() - 1);
+ derivedTableSelect.addAll(aliasedNodes);
+ for (int i = 1; i < selectNodes.size(); i++) {
+ AliasedNode aliasedNode = selectNodes.get(i);
+ String alias = ParseNodeFactory.createTempAlias();
+ derivedTableSelect.add(NODE_FACTORY.aliasedNode(alias, aliasedNode.getNode()));
+ aliasedNode = NODE_FACTORY.aliasedNode(aliasedNode.getAlias(), NODE_FACTORY.column(NODE_FACTORY.table(null, derivedTableAlias), alias, null));
+ selectNodes.set(i, aliasedNode);
+ groupbyNodes.set(i - 1, aliasedNode.getNode());
+ }
+ SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, derivedTableSelect, where, derivedTableGroupBy, true);
+ subquery = NODE_FACTORY.select(Collections.singletonList(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt)), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, subquery.hasSequence());
+ }
+
+ ParseNode onNode = conditionExtractor.getJoinCondition();
+ TableNode rhsTable = NODE_FACTORY.derivedTable(rhsTableAlias, subquery);
+ JoinType joinType = isTopNode ? JoinType.Inner : JoinType.Left;
+ tableNode = NODE_FACTORY.join(joinType, tableNode, rhsTable, onNode, false);
+
+ firstChild = NODE_FACTORY.column(NODE_FACTORY.table(null, rhsTableAlias), selectNodes.get(0).getAlias(), null);
+ if (isNonGroupByAggregate) {
+ firstChild = NODE_FACTORY.upsertStmtArrayNode(Collections.singletonList(firstChild));
+ }
+ ComparisonParseNode secondChild = (ComparisonParseNode) l.get(1);
+ secondChild = NODE_FACTORY.comparison(secondChild.getFilterOp(), secondChild.getLHS(), NODE_FACTORY.elementRef(Lists.newArrayList(firstChild, NODE_FACTORY.literal(1))));
+
+ return Lists.newArrayList(firstChild, secondChild);
}
- private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes) {
- List<AliasedNode> normNodes = Lists.<AliasedNode> newArrayListWithExpectedSize(nodes.size() + 1);
- normNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), LiteralParseNode.ONE));
+ private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes, boolean addSelectOne) {
+ List<AliasedNode> normNodes = Lists.<AliasedNode> newArrayListWithExpectedSize(nodes.size() + (addSelectOne ? 1 : 0));
+ if (addSelectOne) {
+ normNodes.add(NODE_FACTORY.aliasedNode(ParseNodeFactory.createTempAlias(), LiteralParseNode.ONE));
+ }
for (int i = 0; i < nodes.size(); i++) {
AliasedNode aliasedNode = nodes.get(i);
normNodes.add(NODE_FACTORY.aliasedNode(
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 02fc6e3..8e0d42d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -185,6 +185,7 @@ public class HashJoinRegionScanner implements RegionScanner {
if (postFilter != null) {
for (Iterator<Tuple> iter = resultQueue.iterator(); iter.hasNext();) {
Tuple t = iter.next();
+ postFilter.reset();
ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
try {
if (!postFilter.evaluate(t, tempPtr)) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 1ac9d68..bb3940c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -395,6 +395,7 @@ public class HashJoinPlan implements QueryPlan {
private final int index;
private final QueryPlan plan;
private final List<Expression> hashExpressions;
+ private final boolean singleValueOnly;
private final Expression keyRangeLhsExpression;
private final Expression keyRangeRhsExpression;
private final TupleProjector clientProjector;
@@ -402,12 +403,14 @@ public class HashJoinPlan implements QueryPlan {
public HashSubPlan(int index, QueryPlan subPlan,
List<Expression> hashExpressions,
+ boolean singleValueOnly,
Expression keyRangeLhsExpression,
Expression keyRangeRhsExpression,
TupleProjector clientProjector, boolean hasFilters) {
this.index = index;
this.plan = subPlan;
this.hashExpressions = hashExpressions;
+ this.singleValueOnly = singleValueOnly;
this.keyRangeLhsExpression = keyRangeLhsExpression;
this.keyRangeRhsExpression = keyRangeRhsExpression;
this.clientProjector = clientProjector;
@@ -424,7 +427,7 @@ public class HashJoinPlan implements QueryPlan {
ServerCache cache = null;
if (hashExpressions != null) {
cache = parent.hashClient.addHashCache(ranges, plan.iterator(),
- clientProjector, plan.getEstimatedSize(), hashExpressions, parent.plan.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues);
+ clientProjector, plan.getEstimatedSize(), hashExpressions, singleValueOnly, parent.plan.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues);
long endTime = System.currentTimeMillis();
boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
if (!isSet && (endTime - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
index 85eb735..dd23534 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java
@@ -37,23 +37,25 @@ public class ArrayConstructorExpression extends BaseCompoundExpression {
// store the offset postion in this. Later based on the total size move this to a byte[]
// and serialize into byte stream
private int[] offsetPos;
+
+ public ArrayConstructorExpression() {
+ }
public ArrayConstructorExpression(List<Expression> children, PDataType baseType) {
super(children);
init(baseType);
+ }
+
+ private void init(PDataType baseType) {
+ this.baseType = baseType;
+ elements = new Object[getChildren().size()];
estimatedSize = PArrayDataType.estimateSize(this.children.size(), this.baseType);
if (!this.baseType.isFixedWidth()) {
offsetPos = new int[children.size()];
byteStream = new TrustedByteArrayOutputStream(estimatedSize);
} else {
byteStream = new TrustedByteArrayOutputStream(estimatedSize);
- }
-
- }
-
- private void init(PDataType baseType) {
- this.baseType = baseType;
- elements = new Object[getChildren().size()];
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index 1566d64..3219cfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.expression.function.ConvertTimezoneFunction;
import org.apache.phoenix.expression.function.CountAggregateFunction;
import org.apache.phoenix.expression.function.DecodeFunction;
import org.apache.phoenix.expression.function.DistinctCountAggregateFunction;
+import org.apache.phoenix.expression.function.DistinctValueAggregateFunction;
import org.apache.phoenix.expression.function.EncodeFunction;
import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction;
import org.apache.phoenix.expression.function.FirstValueFunction;
@@ -148,6 +149,7 @@ public enum ExpressionType {
LowerFunction(LowerFunction.class),
TrimFunction(TrimFunction.class),
DistinctCountAggregateFunction(DistinctCountAggregateFunction.class),
+ DistinctValueAggregateFunction(DistinctValueAggregateFunction.class),
PercentileContAggregateFunction(PercentileContAggregateFunction.class),
PercentRankAggregateFunction(PercentRankAggregateFunction.class),
StddevPopFunction(StddevPopFunction.class),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
new file mode 100644
index 0000000..fefb077
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.PArrayDataType;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PhoenixArray;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class DistinctValueClientAggregator extends DistinctValueWithCountClientAggregator {
+ private final PDataType valueType;
+ private final PDataType resultType;
+
+ public DistinctValueClientAggregator(SortOrder sortOrder, PDataType valueType, PDataType resultType) {
+ super(sortOrder);
+ this.valueType = valueType;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (buffer == null || buffer.length == 0) {
+ Object[] values = new Object[valueVsCount.size()];
+ int i = 0;
+ for (ImmutableBytesPtr key : valueVsCount.keySet()) {
+ values[i++] = valueType.toObject(key, sortOrder);
+ }
+ PhoenixArray array = PArrayDataType.instantiatePhoenixArray(valueType, values);
+ buffer = resultType.toBytes(array, sortOrder);
+ }
+ ptr.set(buffer);
+ return true;
+ }
+
+ @Override
+ protected PDataType getResultDataType() {
+ return resultType;
+ }
+
+ @Override
+ protected int getBufferLength() {
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
index 4bd2a17..281879e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -54,7 +54,7 @@ public class DistinctValueWithCountServerAggregator extends BaseAggregator {
private int compressThreshold;
private byte[] buffer = null;
- private Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
+ protected Map<ImmutableBytesPtr, Integer> valueVsCount = new HashMap<ImmutableBytesPtr, Integer>();
public DistinctValueWithCountServerAggregator(Configuration conf) {
super(SortOrder.getDefault());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java
new file mode 100644
index 0000000..6877409
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctValueAggregateFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.DistinctValueClientAggregator;
+import org.apache.phoenix.expression.aggregator.DistinctValueWithCountClientAggregator;
+import org.apache.phoenix.expression.aggregator.DistinctValueWithCountServerAggregator;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.PDataType;
+
+@BuiltInFunction(name=DistinctValueAggregateFunction.NAME, args= {@Argument()} )
+public class DistinctValueAggregateFunction extends DistinctValueWithCountAggregateFunction {
+ public static final String NAME = "COLLECTDISTINCT";
+
+ public DistinctValueAggregateFunction() {
+ }
+
+ public DistinctValueAggregateFunction(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ public Aggregator newServerAggregator(Configuration conf) {
+ return new DistinctValueWithCountServerAggregator(conf);
+ }
+
+ @Override
+ public DistinctValueWithCountClientAggregator newClientAggregator() {
+ PDataType baseType = getAggregatorExpression().getDataType().isArrayType() ? PDataType.VARBINARY : getAggregatorExpression().getDataType();
+ PDataType resultType = PDataType.fromTypeId(baseType.getSqlType() + PDataType.ARRAY_TYPE_BASE);
+ return new DistinctValueClientAggregator(getAggregatorExpression().getSortOrder(), baseType, resultType);
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ PDataType baseType = getAggregatorExpression().getDataType().isArrayType() ? PDataType.VARBINARY : getAggregatorExpression().getDataType();
+ return PDataType.fromTypeId(baseType.getSqlType() + PDataType.ARRAY_TYPE_BASE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index 863b535..b6245ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -69,16 +69,16 @@ public class HashCacheClient {
* @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
* size
*/
- public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
+ public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, TableRef cacheUsingTableRef, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
/**
* Serialize and compress hashCacheTable
*/
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- serialize(ptr, iterator, projector, estimatedSize, onExpressions, keyRangeRhsExpression, keyRangeRhsValues);
+ serialize(ptr, iterator, projector, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef);
}
- private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
+ private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<ImmutableBytesWritable> keyRangeRhsValues) throws SQLException {
long maxSize = serverCache.getConnection().getQueryServices().getProps().getLong(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
estimatedSize = Math.min(estimatedSize, maxSize);
if (estimatedSize > Integer.MAX_VALUE) {
@@ -93,6 +93,7 @@ public class HashCacheClient {
WritableUtils.writeVInt(out, ExpressionType.valueOf(expression).ordinal());
expression.write(out);
}
+ out.writeBoolean(singleValueOnly);
int exprSize = baOut.size() + Bytes.SIZEOF_INT;
out.writeInt(exprSize);
int nRows = 0;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 257fa1f..697d862 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -27,9 +27,10 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.xerial.snappy.Snappy;
-
import org.apache.phoenix.cache.HashCache;
import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -67,6 +68,7 @@ public class HashCacheFactory implements ServerCacheFactory {
private class HashCacheImpl implements HashCache {
private final Map<ImmutableBytesPtr,List<Tuple>> hashCache;
private final MemoryChunk memoryChunk;
+ private final boolean singleValueOnly;
private HashCacheImpl(byte[] hashCacheBytes, MemoryChunk memoryChunk) {
try {
@@ -83,6 +85,7 @@ public class HashCacheFactory implements ServerCacheFactory {
expression.readFields(dataInput);
onExpressions.add(expression);
}
+ this.singleValueOnly = dataInput.readBoolean();
int exprSize = dataInput.readInt();
offset += exprSize;
int nRows = dataInput.readInt();
@@ -117,8 +120,14 @@ public class HashCacheFactory implements ServerCacheFactory {
}
@Override
- public List<Tuple> get(ImmutableBytesPtr hashKey) {
- return hashCache.get(hashKey);
+ public List<Tuple> get(ImmutableBytesPtr hashKey) throws IOException {
+ List<Tuple> ret = hashCache.get(hashKey);
+ if (singleValueOnly && ret != null && ret.size() > 1) {
+ SQLException ex = new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
+ ServerUtil.throwIOException(ex.getMessage(), ex);
+ }
+
+ return ret;
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java
index a51ca5c..5dd13f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/JoinTableNode.java
@@ -43,13 +43,15 @@ public class JoinTableNode extends TableNode {
private final TableNode lhs;
private final TableNode rhs;
private final ParseNode onNode;
+ private final boolean singleValueOnly;
- JoinTableNode(JoinType type, TableNode lhs, TableNode rhs, ParseNode onNode) {
+ JoinTableNode(JoinType type, TableNode lhs, TableNode rhs, ParseNode onNode, boolean singleValueOnly) {
super(null);
this.type = type;
this.lhs = lhs;
this.rhs = rhs;
this.onNode = onNode;
+ this.singleValueOnly = singleValueOnly;
}
public JoinType getType() {
@@ -67,6 +69,10 @@ public class JoinTableNode extends TableNode {
public ParseNode getOnNode() {
return onNode;
}
+
+ public boolean isSingleValueOnly() {
+ return singleValueOnly;
+ }
@Override
public <T> T accept(TableNodeVisitor<T> visitor) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 85fd978..4f5e7ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -417,7 +417,7 @@ public class ParseNodeFactory {
public TableNode table(TableNode table, List<JoinPartNode> parts) {
for (JoinPartNode part : parts) {
- table = new JoinTableNode(part.getType(), table, part.getTable(), part.getOnNode());
+ table = new JoinTableNode(part.getType(), table, part.getTable(), part.getOnNode(), false);
}
return table;
}
@@ -426,8 +426,8 @@ public class ParseNodeFactory {
return new JoinPartNode(type, onNode, table);
}
- public JoinTableNode join(JoinType type, TableNode lhs, TableNode rhs, ParseNode on) {
- return new JoinTableNode(type, lhs, rhs, on);
+ public JoinTableNode join(JoinType type, TableNode lhs, TableNode rhs, ParseNode on, boolean singleValueOnly) {
+ return new JoinTableNode(type, lhs, rhs, on, singleValueOnly);
}
public DerivedTableNode derivedTable (String alias, SelectStatement select) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/49ec34be/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
index 06ac1c6..338a45b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
@@ -561,7 +561,7 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> {
if (lhsNode == normLhsNode && rhsNode == normRhsNode && onNode == normOnNode)
return joinNode;
- return NODE_FACTORY.join(joinNode.getType(), normLhsNode, normRhsNode, normOnNode);
+ return NODE_FACTORY.join(joinNode.getType(), normLhsNode, normRhsNode, normOnNode, joinNode.isSingleValueOnly());
}
@Override