You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2022/09/27 20:08:32 UTC

Change in asterixdb[master]: WIP: join hints

From Ali Alsuliman <al...@gmail.com>:

Ali Alsuliman has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17236 )


Change subject: WIP: join hints
......................................................................

WIP: join hints

Change-Id: I7ec73409c9c6fddba01ebe6a9d4eee46e94dd0de
---
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/HashJoinExpressionAnnotation.java
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/BroadcastExpressionAnnotation.java
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
M asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
8 files changed, 282 insertions(+), 75 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/36/17236/1

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
index 4a37007..4ae566c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
@@ -319,11 +319,10 @@
     private void setAnnotation(AbstractFunctionCallExpression afcExpr, IExpressionAnnotation anno) {
         FunctionIdentifier fi = afcExpr.getFunctionIdentifier();
         List<Mutable<ILogicalExpression>> arguments = afcExpr.getArguments();
-        int argumentCount = arguments.size();
 
         if (fi.equals(AlgebricksBuiltinFunctions.AND)) {
-            for (int i = 0; i < argumentCount; i++) {
-                ILogicalExpression argument = arguments.get(i).getValue();
+            for (Mutable<ILogicalExpression> iLogicalExpressionMutable : arguments) {
+                ILogicalExpression argument = iLogicalExpressionMutable.getValue();
                 AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) argument;
                 expr.putAnnotation(anno);
             }
@@ -484,8 +483,7 @@
                 changes = false;
                 List<LogicalVariable> vars = new ArrayList<>();
                 exp.getUsedVariables(vars);
-                Set<LogicalVariable> set = new LinkedHashSet<>();
-                set.addAll(vars);
+                Set<LogicalVariable> set = new LinkedHashSet<>(vars);
                 if (set.size() < vars.size()) {
                     // walk thru vars and find the first instance of the duplicate
                     for (int i = 0; i < vars.size() - 1; i++) {
@@ -563,9 +561,9 @@
     private boolean doAllDataSourcesHaveSamples(
             List<Pair<EmptyTupleSourceOperator, DataSourceScanOperator>> emptyTupleAndDataSourceOps,
             IOptimizationContext context) throws AlgebricksException {
-        for (int i = 0; i < emptyTupleAndDataSourceOps.size(); i++) {
-            if (emptyTupleAndDataSourceOps.get(i).getSecond() != null) {
-                DataSourceScanOperator scanOp = emptyTupleAndDataSourceOps.get(i).getSecond();
+        for (Pair<EmptyTupleSourceOperator, DataSourceScanOperator> emptyTupleAndDataSourceOp : emptyTupleAndDataSourceOps) {
+            if (emptyTupleAndDataSourceOp.getSecond() != null) {
+                DataSourceScanOperator scanOp = emptyTupleAndDataSourceOp.getSecond();
                 Index index = joinEnum.getStatsHandle().findSampleIndex(scanOp, context);
                 if (index == null) {
                     return false;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
index 306bad9..aa32cee 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
@@ -174,8 +174,8 @@
 
     public ILogicalOperator findLeafInput(List<LogicalVariable> logicalVars) throws AlgebricksException {
         Set<LogicalVariable> vars = new HashSet<>();
-        for (int pos = 0; pos < emptyTupleAndDataSourceOps.size(); pos++) {
-            EmptyTupleSourceOperator emptyOp = emptyTupleAndDataSourceOps.get(pos).getFirst();
+        for (Pair<EmptyTupleSourceOperator, DataSourceScanOperator> emptyTupleAndDataSourceOp : emptyTupleAndDataSourceOps) {
+            EmptyTupleSourceOperator emptyOp = emptyTupleAndDataSourceOp.getFirst();
             ILogicalOperator op = joinLeafInputsHashMap.get(emptyOp);
             vars.clear();
             // this is expensive to do. So store this once and reuse
@@ -246,7 +246,7 @@
         return eqPredFound ? andExpr : null;
     }
 
-    public HashJoinExpressionAnnotation.BuildSide findHashJoinHint(List<Integer> newJoinConditions) {
+    public HashJoinExpressionAnnotation findHashJoinHint(List<Integer> newJoinConditions) {
         for (int i : newJoinConditions) {
             JoinCondition jc = joinConditions.get(i);
             if (jc.comparisonType != JoinCondition.comparisonOp.OP_EQ) {
@@ -257,14 +257,14 @@
                 AbstractFunctionCallExpression AFCexpr = (AbstractFunctionCallExpression) expr;
                 HashJoinExpressionAnnotation hjea = AFCexpr.getAnnotation(HashJoinExpressionAnnotation.class);
                 if (hjea != null) {
-                    return hjea.getBuildSide();
+                    return hjea;
                 }
             }
         }
         return null;
     }
 
-    public BroadcastExpressionAnnotation.BroadcastSide findBroadcastHashJoinHint(List<Integer> newJoinConditions) {
+    public BroadcastExpressionAnnotation findBroadcastHashJoinHint(List<Integer> newJoinConditions) {
         for (int i : newJoinConditions) {
             JoinCondition jc = joinConditions.get(i);
             if (jc.comparisonType != JoinCondition.comparisonOp.OP_EQ) {
@@ -275,7 +275,7 @@
                 AbstractFunctionCallExpression AFCexpr = (AbstractFunctionCallExpression) expr;
                 BroadcastExpressionAnnotation bcasthjea = AFCexpr.getAnnotation(BroadcastExpressionAnnotation.class);
                 if (bcasthjea != null) {
-                    return bcasthjea.getBroadcastSide();
+                    return bcasthjea;
                 }
             }
         }
@@ -302,7 +302,7 @@
         for (int i = 1; i <= this.numberOfTerms; i++) {
             if (name.equals(jnArray[i].datasetNames.get(0))) {
                 return i;
-            } else if (name.equals(jnArray[i].alias)) {
+            } else if (name.equals(jnArray[i].aliases.get(0))) {
                 return i;
             }
         }
@@ -582,6 +582,10 @@
                     jn.datasetNames.addAll(jnI.datasetNames);
                     jn.datasetNames.addAll(jnJ.datasetNames);
                     Collections.sort(jn.datasetNames);
+                    jn.aliases = new ArrayList<>();
+                    jn.aliases.addAll(jnI.aliases);
+                    jn.aliases.addAll(jnJ.aliases);
+                    Collections.sort(jn.aliases);
                     jn.size = jnI.size + jnJ.size;
                     jn.cardinality = jn.computeJoinCardinality();
 
@@ -591,7 +595,7 @@
 
                 JoinNode jnIJ = jnArray[addPlansToThisJn];
                 jnIJ.jnArrayIndex = addPlansToThisJn;
-                jnIJ.addMultiDatasetPlans(jnI, jnJ, level);
+                jnIJ.addMultiDatasetPlans(jnI, jnJ);
                 if (forceJoinOrderMode) {
                     break;
                 }
@@ -668,8 +672,8 @@
             DataSourceScanOperator scanOp = emptyTupleAndDataSourceOps.get(i - 1).getSecond();
             if (scanOp != null) {
                 DataSourceId id = (DataSourceId) scanOp.getDataSource().getId();
-                jn.alias = findAlias(scanOp);
                 jn.datasetNames = new ArrayList<>(Collections.singleton(id.getDatasourceName()));
+                jn.aliases = new ArrayList<>(Collections.singleton(findAlias(scanOp)));
                 Index.SampleIndexDetails idxDetails;
                 Index index = stats.findSampleIndex(scanOp, optCtx);
                 if (index != null) {
@@ -695,6 +699,7 @@
             } else {
                 // could be unnest or assign
                 jn.datasetNames = new ArrayList<>(Collections.singleton("unnestOrAssign"));
+                jn.aliases = new ArrayList<>(Collections.singleton("unnestOrAssign"));
                 jn.origCardinality = jn.cardinality = findInListCard(leafInput);
                 // just a guess
                 jn.size = 10;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
index cf4fcad..a6531d2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
@@ -68,7 +68,7 @@
     protected int datasetBits; // this is bitmap of all the keyspaceBits present in this joinNode
     protected List<Integer> datasetIndexes;
     protected List<String> datasetNames;
-    protected String alias;
+    protected List<String> aliases;
     protected int cheapestPlanIndex;
     protected ICost cheapestPlanCost;
     protected double origCardinality; // without any selections
@@ -99,10 +99,7 @@
     }
 
     public boolean IsBaseLevelJoinNode() {
-        if (this.jnArrayIndex <= joinEnum.numberOfTerms) {
-            return true;
-        }
-        return false;
+        return this.jnArrayIndex <= joinEnum.numberOfTerms;
     }
 
     public boolean IsHigherLevelJoinNode() {
@@ -171,8 +168,8 @@
         return rightJn;
     }
 
-    public String getAlias() {
-        return alias;
+    public List<String> getAliases() {
+        return aliases;
     }
 
     public List<String> getDatasetNames() {
@@ -232,9 +229,8 @@
 
         // Now call the rewritePre code
         IntroduceJoinAccessMethodRule tmp = new IntroduceJoinAccessMethodRule();
-        boolean temp = tmp.checkApplicable(new MutableObject<>(joinEnum.localJoinOp), joinEnum.optCtx);
 
-        return temp;
+        return tmp.checkApplicable(new MutableObject<>(joinEnum.localJoinOp), joinEnum.optCtx);
     }
 
     /** one is a subset of two */
@@ -340,9 +336,7 @@
             Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) throws AlgebricksException {
         // Skip indexes with selectivity greater than 0.1, add the SKIP_SECONDARY_INDEX annotation to its expression.
         double sel;
-        Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator();
-        while (amIt.hasNext()) {
-            Map.Entry<IAccessMethod, AccessMethodAnalysisContext> amEntry = amIt.next();
+        for (Map.Entry<IAccessMethod, AccessMethodAnalysisContext> amEntry : analyzedAMs.entrySet()) {
             AccessMethodAnalysisContext analysisCtx = amEntry.getValue();
             Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexIt =
                     analysisCtx.getIteratorForIndexExprsAndVars();
@@ -381,7 +375,8 @@
         }
     }
 
-    protected int buildHashJoinPlan(JoinNode leftJn, JoinNode rightJn, ILogicalExpression hashJoinExpr) {
+    protected int buildHashJoinPlan(JoinNode leftJn, JoinNode rightJn, ILogicalExpression hashJoinExpr,
+            HashJoinExpressionAnnotation hintHashJoin) {
         List<PlanNode> allPlans = joinEnum.allPlans;
         PlanNode pn;
         ICost hjCost, childCosts, totalCost;
@@ -405,7 +400,8 @@
             return PlanNode.NO_PLAN;
         }
 
-        if (rightJn.cardinality * rightJn.size <= leftJn.cardinality * leftJn.size || joinEnum.forceJoinOrderMode
+        if (rightJn.cardinality * rightJn.size <= leftJn.cardinality * leftJn.size || hintHashJoin != null
+                || joinEnum.forceJoinOrderMode
                 || !joinEnum.queryPlanShape.equals(AlgebricksConfig.QUERY_PLAN_SHAPE_ZIGZAG)) {
             // We want to build with the smaller side.
             hjCost = joinEnum.getCostMethodsHandle().costHashJoin(this);
@@ -419,6 +415,9 @@
                 pn.planIndexes[0] = leftPlan;
                 pn.planIndexes[1] = rightPlan;
                 pn.joinOp = PlanNode.JoinMethod.HYBRID_HASH_JOIN; // need to check that all the conditions have equality predicates ONLY.
+                if (hintHashJoin != null) {
+                    hintHashJoin.setBuildSide(HashJoinExpressionAnnotation.BuildSide.RIGHT);
+                }
                 pn.side = HashJoinExpressionAnnotation.BuildSide.RIGHT;
                 pn.joinExpr = hashJoinExpr;
                 pn.opCost = hjCost;
@@ -437,7 +436,8 @@
         return PlanNode.NO_PLAN;
     }
 
-    protected int buildBroadcastHashJoinPlan(JoinNode leftJn, JoinNode rightJn, ILogicalExpression hashJoinExpr) {
+    protected int buildBroadcastHashJoinPlan(JoinNode leftJn, JoinNode rightJn, ILogicalExpression hashJoinExpr,
+            BroadcastExpressionAnnotation hintBroadcastHashJoin) {
         List<PlanNode> allPlans = joinEnum.allPlans;
         PlanNode pn;
         ICost bcastHjCost, childCosts, totalCost;
@@ -461,7 +461,8 @@
             return PlanNode.NO_PLAN;
         }
 
-        if (rightJn.cardinality * rightJn.size <= leftJn.cardinality * leftJn.size || joinEnum.forceJoinOrderMode
+        if (rightJn.cardinality * rightJn.size <= leftJn.cardinality * leftJn.size || hintBroadcastHashJoin != null
+                || joinEnum.forceJoinOrderMode
                 || !joinEnum.queryPlanShape.equals(AlgebricksConfig.QUERY_PLAN_SHAPE_ZIGZAG)) {
             // We want to broadcast and build with the smaller side.
             bcastHjCost = joinEnum.getCostMethodsHandle().costBroadcastHashJoin(this);
@@ -475,6 +476,9 @@
                 pn.planIndexes[0] = leftPlan;
                 pn.planIndexes[1] = rightPlan;
                 pn.joinOp = PlanNode.JoinMethod.BROADCAST_HASH_JOIN; // need to check that all the conditions have equality predicates ONLY.
+                if (hintBroadcastHashJoin != null) {
+                    hintBroadcastHashJoin.setBroadcastSide(BroadcastExpressionAnnotation.BroadcastSide.RIGHT);
+                }
                 pn.side = HashJoinExpressionAnnotation.BuildSide.RIGHT;
                 pn.joinExpr = hashJoinExpr;
                 pn.opCost = bcastHjCost;
@@ -559,11 +563,11 @@
             cpJoinExpr = joinEnum.combineAllConditions(newJoinConditions);
         } else if (hashJoinExpr != null && nestedLoopJoinExpr == null) {
             cpJoinExpr = hashJoinExpr;
-        } else if (hashJoinExpr == null && nestedLoopJoinExpr != null) {
+        } else if (hashJoinExpr == null) {
             cpJoinExpr = nestedLoopJoinExpr;
-        } else if (Objects.equals(hashJoinExpr, nestedLoopJoinExpr) == true) {
+        } else if (Objects.equals(hashJoinExpr, nestedLoopJoinExpr)) {
             cpJoinExpr = hashJoinExpr;
-        } else if (Objects.equals(hashJoinExpr, nestedLoopJoinExpr) == false) {
+        } else if (!Objects.equals(hashJoinExpr, nestedLoopJoinExpr)) {
             ScalarFunctionCallExpression andExpr = new ScalarFunctionCallExpression(
                     BuiltinFunctions.getBuiltinFunctionInfo(AlgebricksBuiltinFunctions.AND));
             andExpr.getArguments().add(new MutableObject<>(hashJoinExpr));
@@ -597,8 +601,7 @@
         return PlanNode.NO_PLAN;
     }
 
-    protected Pair<Integer, ICost> addMultiDatasetPlans(JoinNode leftJn, JoinNode rightJn, int level)
-            throws AlgebricksException {
+    protected Pair<Integer, ICost> addMultiDatasetPlans(JoinNode leftJn, JoinNode rightJn) throws AlgebricksException {
         this.leftJn = leftJn;
         this.rightJn = rightJn;
         ICost noJoinCost = joinEnum.getCostHandle().maxCost();
@@ -632,8 +635,8 @@
         hjPlan = commutativeHjPlan = bcastHjPlan =
                 commutativeBcastHjPlan = nljPlan = commutativeNljPlan = cpPlan = commutativeCpPlan = PlanNode.NO_PLAN;
 
-        HashJoinExpressionAnnotation.BuildSide hintHashJoin = joinEnum.findHashJoinHint(newJoinConditions);
-        BroadcastExpressionAnnotation.BroadcastSide hintBroadcastHashJoin = null;
+        HashJoinExpressionAnnotation hintHashJoin = joinEnum.findHashJoinHint(newJoinConditions);
+        BroadcastExpressionAnnotation hintBroadcastHashJoin = null;
         boolean hintNLJoin = false;
         if (hintHashJoin == null) {
             hintBroadcastHashJoin = joinEnum.findBroadcastHashJoinHint(newJoinConditions);
@@ -647,13 +650,21 @@
         }
 
         if (hintHashJoin != null) {
-            hjPlan = buildHashJoinPlan(leftJn, rightJn, hashJoinExpr);
-            if (!joinEnum.forceJoinOrderMode && hintHashJoin != HashJoinExpressionAnnotation.BuildSide.RIGHT) {
-                commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, hashJoinExpr);
+            boolean build = (hintHashJoin.getBuildOrProbe() == HashJoinExpressionAnnotation.BuildOrProbe.BUILD);
+            String buildOrProbeObject = hintHashJoin.getName();
+            if (build
+                    && (rightJn.datasetNames.contains(buildOrProbeObject)
+                            || rightJn.aliases.contains(buildOrProbeObject))
+                    || !build && (leftJn.datasetNames.contains(buildOrProbeObject)
+                            || leftJn.aliases.contains(buildOrProbeObject))) {
+                hjPlan = buildHashJoinPlan(leftJn, rightJn, hashJoinExpr, hintHashJoin);
+            } else if (!joinEnum.forceJoinOrderMode) {
+                commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, hashJoinExpr, hintHashJoin);
             }
             if (hjPlan == PlanNode.NO_PLAN && commutativeHjPlan == PlanNode.NO_PLAN) {
                 // Hints are attached to predicates, so newJoinConditions should not be empty, but adding the check to be safe.
-                if (!joinEnum.getJoinConditions().isEmpty() && !newJoinConditions.isEmpty()) {
+                if (!joinEnum.getJoinConditions().isEmpty() && !newJoinConditions.isEmpty()
+                        && hintHashJoin.getBuildSide() == null) {
                     IWarningCollector warningCollector = joinEnum.optCtx.getWarningCollector();
                     if (warningCollector.shouldWarn()) {
                         warningCollector.warn(Warning.of(
@@ -662,9 +673,9 @@
                                 ErrorCode.INAPPLICABLE_HINT, "Hash join hint not applicable and was ignored"));
                     }
                 }
-                bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, hashJoinExpr);
+                bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, hashJoinExpr, null);
                 if (!joinEnum.forceJoinOrderMode) {
-                    commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, hashJoinExpr);
+                    commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, hashJoinExpr, null);
                 }
                 nljPlan = buildNLJoinPlan(leftJn, rightJn, nestedLoopJoinExpr);
                 if (!joinEnum.forceJoinOrderMode) {
@@ -676,10 +687,12 @@
                 }
             }
         } else if (hintBroadcastHashJoin != null) {
-            bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, hashJoinExpr);
-            if (!joinEnum.forceJoinOrderMode
-                    && hintBroadcastHashJoin != BroadcastExpressionAnnotation.BroadcastSide.RIGHT) {
-                commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, hashJoinExpr);
+            String broadcastObject = hintBroadcastHashJoin.getName();
+            if (rightJn.datasetNames.contains(broadcastObject) || rightJn.aliases.contains(broadcastObject)) {
+                bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, hashJoinExpr, hintBroadcastHashJoin);
+            } else if (!joinEnum.forceJoinOrderMode) {
+                commutativeBcastHjPlan =
+                        buildBroadcastHashJoinPlan(rightJn, leftJn, hashJoinExpr, hintBroadcastHashJoin);
             }
             if (bcastHjPlan == PlanNode.NO_PLAN && commutativeBcastHjPlan == PlanNode.NO_PLAN) {
                 // Hints are attached to predicates, so newJoinConditions should not be empty, but adding the check to be safe.
@@ -694,9 +707,9 @@
                     }
                 }
 
-                hjPlan = buildHashJoinPlan(leftJn, rightJn, hashJoinExpr);
+                hjPlan = buildHashJoinPlan(leftJn, rightJn, hashJoinExpr, null);
                 if (!joinEnum.forceJoinOrderMode) {
-                    commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, hashJoinExpr);
+                    commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, hashJoinExpr, null);
                 }
                 nljPlan = buildNLJoinPlan(leftJn, rightJn, nestedLoopJoinExpr);
                 if (!joinEnum.forceJoinOrderMode) {
@@ -723,13 +736,13 @@
                                 ErrorCode.INAPPLICABLE_HINT, "Index nested join hint not applicable and was ignored"));
                     }
                 }
-                hjPlan = buildHashJoinPlan(leftJn, rightJn, hashJoinExpr);
+                hjPlan = buildHashJoinPlan(leftJn, rightJn, hashJoinExpr, null);
                 if (!joinEnum.forceJoinOrderMode) {
-                    commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, hashJoinExpr);
+                    commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, hashJoinExpr, null);
                 }
-                bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, hashJoinExpr);
+                bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, hashJoinExpr, null);
                 if (!joinEnum.forceJoinOrderMode) {
-                    commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, hashJoinExpr);
+                    commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, hashJoinExpr, null);
                 }
                 cpPlan = buildCPJoinPlan(leftJn, rightJn, hashJoinExpr, nestedLoopJoinExpr);
                 if (!joinEnum.forceJoinOrderMode) {
@@ -737,13 +750,13 @@
                 }
             }
         } else {
-            hjPlan = buildHashJoinPlan(leftJn, rightJn, hashJoinExpr);
+            hjPlan = buildHashJoinPlan(leftJn, rightJn, hashJoinExpr, null);
             if (!joinEnum.forceJoinOrderMode) {
-                commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, hashJoinExpr);
+                commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, hashJoinExpr, null);
             }
-            bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, hashJoinExpr);
+            bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, hashJoinExpr, null);
             if (!joinEnum.forceJoinOrderMode) {
-                commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, hashJoinExpr);
+                commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, hashJoinExpr, null);
             }
             nljPlan = buildNLJoinPlan(leftJn, rightJn, nestedLoopJoinExpr);
             if (!joinEnum.forceJoinOrderMode) {
@@ -779,9 +792,9 @@
         // This will avoid printing JoinNodes that have no plans
         sb.append("Printing Join Node ").append(jnArrayIndex).append('\n');
         sb.append("datasetNames ").append('\n');
-        for (int j = 0; j < datasetNames.size(); j++) {
+        for (String datasetName : datasetNames) {
             // Need to not print newline
-            sb.append(datasetNames.get(j)).append(' ');
+            sb.append(datasetName).append(' ');
         }
         sb.append("datasetIndex ").append('\n');
         for (int j = 0; j < datasetIndexes.size(); j++) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
index 96bc412..28a79e6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
@@ -148,16 +148,15 @@
             }
         }
 
-        double s = 1.0;
+        double s;
         PredicateCardinalityAnnotation pca = afcExpr.getAnnotation(PredicateCardinalityAnnotation.class);
         if (pca != null) {
             s = pca.getSelectivity();
-            sel *= s;
         } else {
             JoinProductivityAnnotation jpa = afcExpr.getAnnotation(JoinProductivityAnnotation.class);
             s = findJoinSelectivity(jpa, afcExpr);
-            sel *= s;
         }
+        sel *= s;
         if (join && s == 1.0) {
             // assume no selectivity was assigned
             joinEnum.singleDatasetPreds.add(afcExpr);
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 6eb05cb..a78e7a3 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -414,6 +414,32 @@
         return new SQLPPParser(text).parseParenthesizedIdentifierList();
     }
 
+    private Pair<HashJoinExpressionAnnotation.BuildOrProbe, String> parseHashJoinParams() throws CompilationException {
+        return parseImpl(new ParseFunction<Pair<HashJoinExpressionAnnotation.BuildOrProbe, String>>() {
+            @Override
+            public  Pair<HashJoinExpressionAnnotation.BuildOrProbe, String> parse() throws ParseException {
+                return SQLPPParser.this.buildOrProbeParenthesizedIdentifier();
+            }
+        });
+    }
+
+    private static Pair<HashJoinExpressionAnnotation.BuildOrProbe, String> parseHashJoinParams(String text) throws CompilationException {
+        return new SQLPPParser(text).parseHashJoinParams();
+    }
+
+    private String parseBroadcastJoinParams() throws CompilationException {
+            return parseImpl(new ParseFunction<String>() {
+                @Override
+                public  String parse() throws ParseException {
+                    return SQLPPParser.this.parenthesizedIdentifier();
+                }
+            });
+        }
+
+    private static String parseBroadcastJoinParams(String text) throws CompilationException {
+        return new SQLPPParser(text).parseBroadcastJoinParams();
+    }
+
     private List<Literal> parseParenthesizedLiteralList() throws CompilationException {
         return parseImpl(new ParseFunction<List<Literal>>() {
             @Override
@@ -692,9 +718,25 @@
             // attach hint to global scope
             return new JoinProductivityAnnotation (productivity, leftSideDataSet);
           case HASH_BROADCAST_JOIN_HINT:
-            return new BroadcastExpressionAnnotation(BroadcastExpressionAnnotation.BroadcastSide.RIGHT);
+            if (hintToken.hintParams == null) {
+              return new BroadcastExpressionAnnotation();
+            }
+            else {
+              // if parameter parsing fails then return hint annotation without parameters
+              onParseErrorReturn = new BroadcastExpressionAnnotation();
+              String name = parseBroadcastJoinParams(hintToken.hintParams);
+              return new BroadcastExpressionAnnotation(name);
+            }
           case HASH_JOIN_HINT:
-            return new HashJoinExpressionAnnotation(HashJoinExpressionAnnotation.BuildSide.RIGHT);
+            if (hintToken.hintParams == null) {
+              return new HashJoinExpressionAnnotation();
+            }
+            else {
+              // if parameter parsing fails then return hint annotation without parameters
+              onParseErrorReturn = new HashJoinExpressionAnnotation();
+              Pair<HashJoinExpressionAnnotation.BuildOrProbe, String> pair = parseHashJoinParams(hintToken.hintParams);
+              return new HashJoinExpressionAnnotation(pair);
+            }
           case INDEXED_NESTED_LOOP_JOIN_HINT:
             if (hintToken.hintParams == null) {
               return IndexedNLJoinExpressionAnnotation.INSTANCE_ANY_INDEX;
@@ -3163,6 +3205,40 @@
   }
 }
 
+Pair<HashJoinExpressionAnnotation.BuildOrProbe, String> buildOrProbeParenthesizedIdentifier() throws ParseException:
+{
+  Pair<HashJoinExpressionAnnotation.BuildOrProbe, String> pair = new Pair<HashJoinExpressionAnnotation.BuildOrProbe, String>(HashJoinExpressionAnnotation.BuildOrProbe.BUILD, "");
+  String ident1 = null;
+  String ident2 = null;
+}
+{
+  ident1 = Identifier() <LEFTPAREN> ident2 = Identifier() <RIGHTPAREN>
+  {
+    // check
+    if (ident1.equals("build")) {
+        return new Pair<HashJoinExpressionAnnotation.BuildOrProbe, String>(HashJoinExpressionAnnotation.BuildOrProbe.BUILD, ident2);
+    }
+    else if (ident1.equals("probe")) {
+            return new Pair<HashJoinExpressionAnnotation.BuildOrProbe, String>(HashJoinExpressionAnnotation.BuildOrProbe.PROBE, ident2);
+    }
+    else {
+        return pair;
+    }
+    return pair;
+  }
+}
+
+String parenthesizedIdentifier() throws ParseException:
+{
+  String ident = null;
+}
+{
+  <LEFTPAREN> ident = Identifier() <RIGHTPAREN>
+  {
+    return ident;
+  }
+}
+
 List<Literal> ParenthesizedLiteralList() throws ParseException:
 {
   List<Literal> list = new ArrayList<Literal>();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/BroadcastExpressionAnnotation.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/BroadcastExpressionAnnotation.java
index 79b9e2c..343df8d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/BroadcastExpressionAnnotation.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/BroadcastExpressionAnnotation.java
@@ -25,7 +25,6 @@
     public enum BroadcastSide {
         LEFT,
         RIGHT;
-
         public static BroadcastSide getOppositeSide(BroadcastSide side) {
             switch (side) {
                 case LEFT:
@@ -38,13 +37,33 @@
         }
     }
 
-    private final BroadcastSide side;
+    private final String name;
+    private BroadcastSide side;
+
+    public BroadcastExpressionAnnotation() {
+        this.name = null;
+        this.side = null;
+    }
+
+    public BroadcastExpressionAnnotation(String name) {
+        this.name = name;
+        this.side = null;
+    }
 
     public BroadcastExpressionAnnotation(BroadcastSide side) {
+        this.name = null;
         this.side = Objects.requireNonNull(side);
     }
 
+    public String getName() {
+        return name;
+    }
+
     public BroadcastSide getBroadcastSide() {
         return side;
     }
+
+    public void setBroadcastSide(BroadcastSide side) {
+        this.side = side;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/HashJoinExpressionAnnotation.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/HashJoinExpressionAnnotation.java
index 02ba2db..8b6ce7e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/HashJoinExpressionAnnotation.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/HashJoinExpressionAnnotation.java
@@ -21,19 +21,55 @@
 
 import java.util.Objects;
 
+import org.apache.hyracks.algebricks.common.utils.Pair;
+
 public class HashJoinExpressionAnnotation implements IExpressionAnnotation {
+
+    public enum BuildOrProbe {
+        BUILD,
+        PROBE
+    }
+
     public enum BuildSide {
         LEFT,
         RIGHT
     }
 
-    private final BuildSide side;
+    private final BuildOrProbe buildOrProbe;
+    private final String name;
+    private BuildSide side;
+
+    public HashJoinExpressionAnnotation() {
+        this.buildOrProbe = null;
+        this.name = null;
+        this.side = null;
+    }
+
+    public HashJoinExpressionAnnotation(Pair<BuildOrProbe, String> pair) {
+        this.buildOrProbe = pair.getFirst();
+        this.name = pair.getSecond();
+        this.side = null;
+    }
 
     public HashJoinExpressionAnnotation(BuildSide side) {
+        this.buildOrProbe = null;
+        this.name = null;
         this.side = Objects.requireNonNull(side);
     }
 
+    public BuildOrProbe getBuildOrProbe() {
+        return buildOrProbe;
+    }
+
+    public String getName() {
+        return name;
+    }
+
     public BuildSide getBuildSide() {
         return side;
     }
+
+    public void setBuildSide(BuildSide side) {
+        this.side = side;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 1ac7614..acbb100 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -32,6 +32,8 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation;
 import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation.BroadcastSide;
+import org.apache.hyracks.algebricks.core.algebra.expressions.HashJoinExpressionAnnotation;
+import org.apache.hyracks.algebricks.core.algebra.expressions.HashJoinExpressionAnnotation.BuildSide;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions.ComparisonKind;
@@ -65,11 +67,35 @@
         List<LogicalVariable> varsRight = op.getInputs().get(1).getValue().getSchema();
         ILogicalExpression conditionExpr = op.getCondition().getValue();
         if (isHashJoinCondition(conditionExpr, varsLeft, varsRight, sideLeft, sideRight)) {
-            BroadcastSide side = getBroadcastJoinSide(conditionExpr);
-            if (side == null) {
-                setHashJoinOp(op, JoinPartitioningType.PAIRWISE, sideLeft, sideRight, context);
+            BroadcastSide broadcastSide = getBroadcastJoinSide(conditionExpr);
+            if (broadcastSide == null) {
+                BuildSide buildSide = getHashJoinBuildSide(conditionExpr);
+                if (buildSide == null) {
+                    setHashJoinOp(op, JoinPartitioningType.PAIRWISE, sideLeft, sideRight, context);
+                } else {
+                    switch (buildSide) {
+                        case RIGHT:
+                            setHashJoinOp(op, JoinPartitioningType.PAIRWISE, sideLeft, sideRight, context);
+                            break;
+                        case LEFT:
+                            if (op.getJoinKind() == AbstractBinaryJoinOperator.JoinKind.INNER) {
+                                Mutable<ILogicalOperator> opRef0 = op.getInputs().get(0);
+                                Mutable<ILogicalOperator> opRef1 = op.getInputs().get(1);
+                                ILogicalOperator tmp = opRef0.getValue();
+                                opRef0.setValue(opRef1.getValue());
+                                opRef1.setValue(tmp);
+                                setHashJoinOp(op, JoinPartitioningType.PAIRWISE, sideRight, sideLeft, context);
+                            } else {
+                                setHashJoinOp(op, JoinPartitioningType.PAIRWISE, sideLeft, sideRight, context);
+                            }
+                            break;
+                        default:
+                            // This should never happen
+                            throw new IllegalStateException(buildSide.toString());
+                    }
+                }
             } else {
-                switch (side) {
+                switch (broadcastSide) {
                     case RIGHT:
                         setHashJoinOp(op, JoinPartitioningType.BROADCAST, sideLeft, sideRight, context);
                         break;
@@ -87,7 +113,7 @@
                         break;
                     default:
                         // This should never happen
-                        throw new IllegalStateException(side.toString());
+                        throw new IllegalStateException(broadcastSide.toString());
                 }
             }
         } else {
@@ -214,6 +240,32 @@
         return null;
     }
 
+    private static BuildSide getHashJoinBuildSide(ILogicalExpression e) {
+        BuildSide side = null;
+        if (e.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return null;
+        }
+        AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) e;
+        FunctionIdentifier fi = fexp.getFunctionIdentifier();
+        if (fi.equals(AlgebricksBuiltinFunctions.AND)) {
+            for (Mutable<ILogicalExpression> a : fexp.getArguments()) {
+                BuildSide newSide = getHashJoinBuildSide(a.getValue());
+                if (side == null) {
+                    side = newSide;
+                } else if (newSide != null && !newSide.equals(side)) {
+                    return null;
+                }
+            }
+            return side;
+        } else {
+            HashJoinExpressionAnnotation hashJoinAnnnotation = fexp.getAnnotation(HashJoinExpressionAnnotation.class);
+            if (hashJoinAnnnotation != null) {
+                return hashJoinAnnnotation.getBuildSide();
+            }
+        }
+        return null;
+    }
+
     private static void warnIfCrossProduct(ILogicalExpression conditionExpr, SourceLocation sourceLoc,
             IOptimizationContext context) {
         if (OperatorPropertiesUtil.isAlwaysTrueCond(conditionExpr) && sourceLoc != null) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17236
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I7ec73409c9c6fddba01ebe6a9d4eee46e94dd0de
Gerrit-Change-Number: 17236
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <al...@gmail.com>
Gerrit-MessageType: newchange