You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2015/10/09 05:22:54 UTC

incubator-asterixdb-hyracks git commit: ASTERIXDB-221: reduce unneceesary partitioning for hash joins.

Repository: incubator-asterixdb-hyracks
Updated Branches:
  refs/heads/master aaa888ba6 -> c4118bd0c


ASTERIXDB-221: reduce unneceesary partitioning for hash joins.

For a hash join, start top-down data property optimization from
a partitioning-compatiable child, and hence the other child's
partitioning requirement could be updated.

Change-Id: I835ea712c2f427149d45464fcb3841b8d33f6507
Reviewed-on: https://asterix-gerrit.ics.uci.edu/395
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Wenhai Li <lw...@yahoo.com>
Reviewed-by: Yingyi Bu <bu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/c4118bd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/c4118bd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/c4118bd0

Branch: refs/heads/master
Commit: c4118bd0c10ccacf5d13e436e151c4b7e86b61b7
Parents: aaa888b
Author: Yingyi Bu <bu...@gmail.com>
Authored: Thu Sep 24 21:06:28 2015 -0700
Committer: Yingyi Bu <bu...@gmail.com>
Committed: Thu Oct 8 20:17:53 2015 -0700

----------------------------------------------------------------------
 .../physical/AbstractHashJoinPOperator.java     | 11 +++-
 .../core/algebra/properties/PropertiesUtil.java |  9 ++-
 .../rules/EnforceStructuralPropertiesRule.java  | 66 ++++++++++++++++----
 3 files changed, 65 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c4118bd0/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 1091e1a..f5ea5f1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -138,11 +138,16 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
                                     Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
                                     Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
                                     Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
+                                    Set<LogicalVariable> keysCurrent = uppreq.getColumnSet();
+                                    List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent)) ? keysRightBranch
+                                            : keysLeftBranch;
+                                    List<LogicalVariable> keysSecond = keysFirst == keysRightBranch ? keysLeftBranch
+                                            : keysRightBranch;
                                     for (LogicalVariable r : uppreq.getColumnSet()) {
                                         EquivalenceClass ecSnd = eqmap.get(r);
                                         boolean found = false;
                                         int j = 0;
-                                        for (LogicalVariable rvar : keysRightBranch) {
+                                        for (LogicalVariable rvar : keysFirst) {
                                             if (rvar == r || ecSnd != null && eqmap.get(rvar) == ecSnd) {
                                                 found = true;
                                                 break;
@@ -151,9 +156,9 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
                                         }
                                         if (!found) {
                                             throw new IllegalStateException("Did not find a variable equivalent to "
-                                                    + r + " among " + keysRightBranch);
+                                                    + r + " among " + keysFirst);
                                         }
-                                        LogicalVariable v2 = keysLeftBranch.get(j);
+                                        LogicalVariable v2 = keysSecond.get(j);
                                         EquivalenceClass ecFst = eqmap.get(v2);
                                         for (LogicalVariable vset1 : set1) {
                                             if (vset1 == v2 || ecFst != null && eqmap.get(vset1) == ecFst) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c4118bd0/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index af40f67..ae9f4f1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -132,11 +132,10 @@ public class PropertiesUtil {
                     case UNORDERED_PARTITIONED: {
                         UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd;
                         UnorderedPartitionedProperty ud = (UnorderedPartitionedProperty) dlvd;
-                        if (mayExpandProperties) {
-                            return isPrefixOf(ud.getColumnSet().iterator(), ur.getColumnSet().iterator());
-                        } else {
-                            return ur.getColumnSet().equals(ud.getColumnSet());
-                        }
+                        if (mayExpandProperties)
+                            return (!ud.getColumnSet().isEmpty() && ur.getColumnSet().containsAll(ud.getColumnSet()));
+                        else
+                            return (ud.getColumnSet().equals(ur.getColumnSet()));
                     }
                     case ORDERED_PARTITIONED: {
                         UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/c4118bd0/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 4df9db7..2181efa 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -27,7 +27,6 @@ import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -154,6 +153,43 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
         return changed;
     }
 
+    // Gets the index of a child to start top-down data property enforcement.
+    // If there is a partitioning-compatible child with the operator in opRef, start from this child;
+    // otherwise, start from child zero.
+    private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan,
+            IOptimizationContext context) throws AlgebricksException {
+        IPhysicalPropertiesVector[] reqdProperties = null;
+        if (pr != null) {
+            reqdProperties = pr.getRequiredProperties();
+        }
+
+        List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<IPartitioningProperty>();
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+            deliveredPartitioningPropertiesFromChildren.add(child.getDeliveredPhysicalProperties()
+                    .getPartitioningProperty());
+        }
+        int partitioningCompatibleChild = 0;
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            IPartitioningProperty deliveredPropertyFromChild = deliveredPartitioningPropertiesFromChildren.get(i);
+            if (reqdProperties == null
+                    || reqdProperties[i] == null
+                    || reqdProperties[i].getPartitioningProperty() == null
+                    || deliveredPropertyFromChild == null
+                    || reqdProperties[i].getPartitioningProperty().getPartitioningType() != deliveredPartitioningPropertiesFromChildren
+                            .get(i).getPartitioningType()) {
+                continue;
+            }
+            IPartitioningProperty requiredPropertyForChild = reqdProperties[i].getPartitioningProperty();
+            // If child i's delivered partitioning property already satisfies the required property, stop and return the child index.
+            if (PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, deliveredPropertyFromChild, true)) {
+                partitioningCompatibleChild = i;
+                break;
+            }
+        }
+        return partitioningCompatibleChild;
+    }
+
     private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, IPhysicalPropertiesVector required,
             boolean nestedPlan, IOptimizationContext context) throws AlgebricksException {
 
@@ -201,23 +237,31 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
             }
         }
 
+        // The child index of the child operator to optimize first.
+        int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context);
         IPartitioningProperty firstDeliveredPartitioning = null;
-        int i = 0;
-        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
-            AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+        // Enforce data properties in a top-down manner.
+        for (int j = 0; j < op.getInputs().size(); j++) {
+            // Starts from a partitioning-compatible child if any to loop over all children.
+            int childIndex = (j + startChildIndex) % op.getInputs().size();
+            IPhysicalPropertiesVector requiredProperty = reqdProperties[childIndex];
+            AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
             IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
 
             AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Properties delivered by " + child.getPhysicalOperator()
                     + ": " + delivered + "\n");
             IPartitioningRequirementsCoordinator prc = pr.getPartitioningCoordinator();
+            // Coordinates requirements by looking at the firstDeliveredPartitioning.
             Pair<Boolean, IPartitioningProperty> pbpp = prc.coordinateRequirements(
-                    reqdProperties[i].getPartitioningProperty(), firstDeliveredPartitioning, op, context);
+                    requiredProperty.getPartitioningProperty(), firstDeliveredPartitioning, op, context);
             boolean mayExpandPartitioningProperties = pbpp.first;
             IPhysicalPropertiesVector rqd = new StructuralPropertiesVector(pbpp.second,
-                    reqdProperties[i].getLocalProperties());
+                    requiredProperty.getLocalProperties());
 
             AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required properties for " + child.getPhysicalOperator()
                     + ": " + rqd + "\n");
+            // The partitioning property of reqdProperties[childIndex] could be updated here because
+            // rqd.getPartitioningProperty() is the same object instance as requiredProperty.getPartitioningProperty().
             IPhysicalPropertiesVector diff = delivered.getUnsatisfiedPropertiesFrom(rqd,
                     mayExpandPartitioningProperties, context.getEquivalenceClassMap(child), context.getFDList(child));
 
@@ -227,9 +271,9 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
 
             if (diff != null) {
                 changed = true;
-                addEnforcers(op, i, diff, rqd, delivered, childrenDomain, nestedPlan, context);
+                addEnforcers(op, childIndex, diff, rqd, delivered, childrenDomain, nestedPlan, context);
 
-                AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(i).getValue());
+                AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(childIndex).getValue());
 
                 if (newChild != child) {
                     delivered = newChild.getDeliveredPhysicalProperties();
@@ -242,8 +286,8 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
                         break;
                     }
                 }
-
             }
+
             if (firstDeliveredPartitioning == null) {
                 IPartitioningProperty dpp = delivered.getPartitioningProperty();
                 if (dpp.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED
@@ -251,8 +295,6 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
                     firstDeliveredPartitioning = dpp;
                 }
             }
-
-            i++;
         }
 
         if (op.hasNestedPlans()) {
@@ -279,7 +321,6 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
             // Now, transfer annotations from the original sort op. to this one.
             AbstractLogicalOperator transferTo = nextOp;
             if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
-                //
                 // remove duplicate exchange operator
                 transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue();
             }
@@ -598,7 +639,6 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
             }
             return ordCols;
         }
-
     }
 
     private void setNewOp(Mutable<ILogicalOperator> opRef, AbstractLogicalOperator newOp, IOptimizationContext context)