You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2018/11/01 18:10:06 UTC

asterixdb git commit: [ASTERIXDB-2286][COMP] Parallel sort changes p.2

Repository: asterixdb
Updated Branches:
  refs/heads/master 6034ece7e -> 654474692


[ASTERIXDB-2286][COMP] Parallel sort changes p.2

- user model changes: no
- storage format changes: no
- interface changes: no

details:
This patch is change the way the SequentialMergeExchangePOperator
connector computes its local property instead of blindly
propagating the child's local property.

The patch also includes minor code clean-ups (moved some methods down)

Change-Id: Ie37e03b6fc6e55fc21f8324c0f09a7fa05b51769
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3005
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dm...@couchbase.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/65447469
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/65447469
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/65447469

Branch: refs/heads/master
Commit: 654474692dd19dd42e0b9efcc8209d05a8c41b2e
Parents: 6034ece
Author: Ali Alsuliman <al...@gmail.com>
Authored: Tue Oct 30 22:03:01 2018 -0700
Committer: Ali Alsuliman <al...@gmail.com>
Committed: Thu Nov 1 11:07:43 2018 -0700

----------------------------------------------------------------------
 .../p_sort_seq_merge/p_sort_seq_merge.sqlpp     |  41 +++++++
 .../p_sort_seq_merge/p_sort_seq_merge.plan      |  34 ++++++
 .../p_sort_seq_merge.1.ddl.sqlpp                |   3 +-
 .../common/config/CompilerProperties.java       |   2 +-
 .../std/RangeMapAggregateDescriptor.java        |   8 +-
 .../SequentialMergeExchangePOperator.java       |  75 ++++++++++--
 .../rules/EnforceStructuralPropertiesRule.java  | 118 +++++++++----------
 .../DeterministicPartitionBatchManager.java     |   2 +-
 .../collectors/SequentialMergeFrameReader.java  |   1 +
 9 files changed, 206 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/65447469/asterixdb/asterix-app/src/test/resources/optimizerts/queries/p_sort_seq_merge/p_sort_seq_merge.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/p_sort_seq_merge/p_sort_seq_merge.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/p_sort_seq_merge/p_sort_seq_merge.sqlpp
new file mode 100644
index 0000000..473a52a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/p_sort_seq_merge/p_sort_seq_merge.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: testing a sequential merge when parallel sort has redistributed the data across partitions and one of
+ * the next operators requires merging the data. The local order property is not present, but ORDERED_PARTITIONED is
+ * present, and sequential merge connector will be introduced instead of a random merge.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type TestType as
+{
+  id: int,
+  f1: int
+};
+
+create  dataset TestDS(TestType) primary key id;
+
+set `compiler.sort.parallel` "true";
+
+[(select * from TestDS  v order by v.f1, v.id)];
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/65447469/asterixdb/asterix-app/src/test/resources/optimizerts/results/p_sort_seq_merge/p_sort_seq_merge.plan
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/p_sort_seq_merge/p_sort_seq_merge.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/p_sort_seq_merge/p_sort_seq_merge.plan
new file mode 100644
index 0000000..d3d1d85
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/p_sort_seq_merge/p_sort_seq_merge.plan
@@ -0,0 +1,34 @@
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      -- ASSIGN  |UNPARTITIONED|
+        -- AGGREGATE  |UNPARTITIONED|
+          -- SEQUENTIAL_MERGE_EXCHANGE  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STABLE_SORT [$$18(ASC), $$17(ASC)]  |PARTITIONED|
+                      -- RANGE_PARTITION_EXCHANGE [$$18(ASC), $$17(ASC)]  |PARTITIONED|
+                        -- FORWARD  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- REPLICATE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                            -- AGGREGATE  |UNPARTITIONED|
+                              -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                -- AGGREGATE  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- REPLICATE  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/65447469/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp
index af54590..9f45ee3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp
@@ -19,7 +19,8 @@
 
 /*
  * Description: testing a sequential merge when parallel sort has redistributed the data across partitions and one of
- * the next operators requires merging the sorted data.
+ * the next operators requires merging the data. The local order property is not present, but ORDERED_PARTITIONED is
+ * present, and sequential merge connector will be introduced instead of a random merge.
  */
 
 drop dataverse test if exists;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/65447469/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index 4c58ad7..66c95ee 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -64,7 +64,7 @@ public class CompilerProperties extends AbstractProperties {
         COMPILER_SORT_SAMPLES(
                 INTEGER,
                 AlgebricksConfig.SORT_SAMPLES,
-                "The number of samples parallel sorting should " + "take from each partition");
+                "The number of samples which parallel sorting should take from each partition");
 
         private final IOptionType type;
         private final Object defaultValue;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/65447469/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
index c967a94..d33a6f7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
@@ -118,12 +118,12 @@ public class RangeMapAggregateDescriptor extends AbstractAggregateFunctionDynami
             @Override
             public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx)
                     throws HyracksDataException {
-                return new GlobalSamplingAggregateFunction(args, ctx, ascendingFlags, numOfPartitions, numOrderFields);
+                return new RangeMapFunction(args, ctx, ascendingFlags, numOfPartitions, numOrderFields);
             }
         };
     }
 
-    private class GlobalSamplingAggregateFunction implements IAggregateEvaluator {
+    private class RangeMapFunction implements IAggregateEvaluator {
         private final IScalarEvaluator localSamplesEval;
         private final IPointable localSamples;
         private final List<List<byte[]>> finalSamples;
@@ -138,8 +138,8 @@ public class RangeMapAggregateDescriptor extends AbstractAggregateFunctionDynami
         private final ArrayBackedValueStorage storage;
 
         @SuppressWarnings("unchecked")
-        private GlobalSamplingAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
-                boolean[] ascending, int numOfPartitions, int numOrderByFields) throws HyracksDataException {
+        private RangeMapFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean[] ascending,
+                int numOfPartitions, int numOrderByFields) throws HyracksDataException {
             localSamples = new VoidPointable();
             localSamplesEval = args[0].createScalarEvaluator(context);
             finalSamples = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/65447469/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
index df0b446..df9141b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -31,7 +30,11 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogi
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty.PartitioningType;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -39,6 +42,9 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.connectors.MToOneSequentialMergingConnectorDescriptor;
 
+/**
+ * A merging connector that merges the tuples sequentially from the partitions starting from the partition at index 0.
+ */
 public class SequentialMergeExchangePOperator extends AbstractExchangePOperator {
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -47,30 +53,75 @@ public class SequentialMergeExchangePOperator extends AbstractExchangePOperator
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+            IPhysicalPropertiesVector requiredByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
+    /**
+     * <Pre>
+     * The local properties delivered by this connector are either:
+     * 1. nothing if the child doesn't deliver any special property
+     * 2. order property if the child is locally ordered and globally ordered on the same prefix
+     *
+     * The partitioning property is always UNPARTITIONED since it's a merging connector
+     * </Pre>
+     * @param op the logical operator of this physical operator
+     * @param context optimization context, not used here
+     */
     @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
-            throws AlgebricksException {
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
         AbstractLogicalOperator childOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
         List<ILocalStructuralProperty> childLocalProps = childOp.getDeliveredPhysicalProperties().getLocalProperties();
-        List<ILocalStructuralProperty> localProperties;
-        if (childLocalProps != null) {
-            localProperties = new ArrayList<>(childLocalProps);
-        } else {
-            localProperties = new ArrayList<>(0);
+        IPartitioningProperty childPartitioning = childOp.getDeliveredPhysicalProperties().getPartitioningProperty();
+        List<ILocalStructuralProperty> outputLocalProp = new ArrayList<>(0);
+        if (childLocalProps != null && !childLocalProps.isEmpty() && childPartitioning != null
+                && childPartitioning.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED) {
+            // the child could have a local order property that matches its global order property
+            propagateChildProperties((OrderedPartitionedProperty) childPartitioning, childLocalProps, outputLocalProp);
         }
 
-        deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, localProperties);
+        deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, outputLocalProp);
     }
 
     @Override
     public Pair<IConnectorDescriptor, IHyracksJobBuilder.TargetConstraint> createConnectorDescriptor(
-            IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context)
-            throws AlgebricksException {
+            IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) {
         IConnectorDescriptor connector = new MToOneSequentialMergingConnectorDescriptor(spec);
         return new Pair<>(connector, IHyracksJobBuilder.TargetConstraint.ONE);
     }
+
+    /**
+     * Matches prefix of the child's local order property & global order property. If a prefix is determined, the
+     * local order property is propagated through this connector. In essence, the connector says it maintains the
+     * order originally present in the child.
+     * @param childPartitioning the global ordering property of the child made by ORDERED_PARTITIONED partitioning
+     * @param childLocalProps the local properties inside the partitions
+     * @param outputLocalProp the local property of the connector that will be modified if propagating prop. happens
+     */
+    private void propagateChildProperties(OrderedPartitionedProperty childPartitioning,
+            List<ILocalStructuralProperty> childLocalProps, List<ILocalStructuralProperty> outputLocalProp) {
+        ILocalStructuralProperty childLocalProp = childLocalProps.get(0);
+        // skip if the first property is a grouping property
+        if (childLocalProp.getPropertyType() == ILocalStructuralProperty.PropertyType.LOCAL_ORDER_PROPERTY) {
+            OrderColumn localOrderColumn;
+            List<OrderColumn> outputOrderColumns = new ArrayList<>();
+            List<OrderColumn> globalOrderColumns = childPartitioning.getOrderColumns();
+            List<OrderColumn> localOrderColumns = ((LocalOrderProperty) childLocalProp).getOrderColumns();
+            // start matching the order columns
+            for (int i = 0; i < localOrderColumns.size() && i < globalOrderColumns.size(); i++) {
+                localOrderColumn = localOrderColumns.get(i);
+                if (localOrderColumn.equals(globalOrderColumns.get(i))) {
+                    outputOrderColumns.add(localOrderColumn);
+                } else {
+                    // stop whenever the matching fails, end of prefix matching
+                    break;
+                }
+            }
+
+            if (!outputOrderColumns.isEmpty()) {
+                // found a prefix
+                outputLocalProp.add(new LocalOrderProperty(outputOrderColumns));
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/65447469/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 52b3f59..96e2e53 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -167,60 +167,6 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
         return changed;
     }
 
-    private boolean physOptimizePlan(ILogicalPlan plan, IPhysicalPropertiesVector pvector, boolean nestedPlan,
-            IOptimizationContext context) throws AlgebricksException {
-        boolean loggerTraceEnabled = AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled();
-        boolean changed = false;
-        for (Mutable<ILogicalOperator> root : plan.getRoots()) {
-            if (physOptimizeOp(root, pvector, nestedPlan, context)) {
-                changed = true;
-            }
-            AbstractLogicalOperator op = (AbstractLogicalOperator) root.getValue();
-            op.computeDeliveredPhysicalProperties(context);
-            if (loggerTraceEnabled) {
-                AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">>>> Structural properties for " + op.getPhysicalOperator()
-                        + ": " + op.getDeliveredPhysicalProperties() + "\n");
-            }
-        }
-        return changed;
-    }
-
-    // Gets the index of a child to start top-down data property enforcement.
-    // If there is a partitioning-compatible child with the operator in opRef,
-    // start from this child; otherwise, start from child zero.
-    private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan,
-            IOptimizationContext context) throws AlgebricksException {
-        IPhysicalPropertiesVector[] reqdProperties = null;
-        if (pr != null) {
-            reqdProperties = pr.getRequiredProperties();
-        }
-
-        List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<>();
-        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
-            AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
-            deliveredPartitioningPropertiesFromChildren
-                    .add(child.getDeliveredPhysicalProperties().getPartitioningProperty());
-        }
-        int partitioningCompatibleChild = 0;
-        for (int i = 0; i < op.getInputs().size(); i++) {
-            IPartitioningProperty deliveredPropertyFromChild = deliveredPartitioningPropertiesFromChildren.get(i);
-            if (reqdProperties == null || reqdProperties[i] == null
-                    || reqdProperties[i].getPartitioningProperty() == null || deliveredPropertyFromChild == null
-                    || reqdProperties[i].getPartitioningProperty()
-                            .getPartitioningType() != deliveredPartitioningPropertiesFromChildren.get(i)
-                                    .getPartitioningType()) {
-                continue;
-            }
-            IPartitioningProperty requiredPropertyForChild = reqdProperties[i].getPartitioningProperty();
-            // If child i's delivered partitioning property already satisfies the required property, stop and return the child index.
-            if (PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, deliveredPropertyFromChild, true)) {
-                partitioningCompatibleChild = i;
-                break;
-            }
-        }
-        return partitioningCompatibleChild;
-    }
-
     private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, IPhysicalPropertiesVector required,
             boolean nestedPlan, IOptimizationContext context) throws AlgebricksException {
 
@@ -359,6 +305,60 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
         return changed;
     }
 
+    private boolean physOptimizePlan(ILogicalPlan plan, IPhysicalPropertiesVector pvector, boolean nestedPlan,
+            IOptimizationContext context) throws AlgebricksException {
+        boolean loggerTraceEnabled = AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled();
+        boolean changed = false;
+        for (Mutable<ILogicalOperator> root : plan.getRoots()) {
+            if (physOptimizeOp(root, pvector, nestedPlan, context)) {
+                changed = true;
+            }
+            AbstractLogicalOperator op = (AbstractLogicalOperator) root.getValue();
+            op.computeDeliveredPhysicalProperties(context);
+            if (loggerTraceEnabled) {
+                AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">>>> Structural properties for " + op.getPhysicalOperator()
+                        + ": " + op.getDeliveredPhysicalProperties() + "\n");
+            }
+        }
+        return changed;
+    }
+
+    // Gets the index of a child to start top-down data property enforcement.
+    // If there is a partitioning-compatible child with the operator in opRef,
+    // start from this child; otherwise, start from child zero.
+    private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan,
+            IOptimizationContext context) throws AlgebricksException {
+        IPhysicalPropertiesVector[] reqdProperties = null;
+        if (pr != null) {
+            reqdProperties = pr.getRequiredProperties();
+        }
+
+        List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<>();
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+            deliveredPartitioningPropertiesFromChildren
+                    .add(child.getDeliveredPhysicalProperties().getPartitioningProperty());
+        }
+        int partitioningCompatibleChild = 0;
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            IPartitioningProperty deliveredPropertyFromChild = deliveredPartitioningPropertiesFromChildren.get(i);
+            if (reqdProperties == null || reqdProperties[i] == null
+                    || reqdProperties[i].getPartitioningProperty() == null || deliveredPropertyFromChild == null
+                    || reqdProperties[i].getPartitioningProperty()
+                            .getPartitioningType() != deliveredPartitioningPropertiesFromChildren.get(i)
+                                    .getPartitioningType()) {
+                continue;
+            }
+            IPartitioningProperty requiredPropertyForChild = reqdProperties[i].getPartitioningProperty();
+            // If child i's delivered partitioning property already satisfies the required property, stop and return the child index.
+            if (PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, deliveredPropertyFromChild, true)) {
+                partitioningCompatibleChild = i;
+                break;
+            }
+        }
+        return partitioningCompatibleChild;
+    }
+
     private IPhysicalPropertiesVector newPropertiesDiff(AbstractLogicalOperator newChild,
             IPhysicalPropertiesVector required, boolean mayExpandPartitioningProperties, IOptimizationContext context)
             throws AlgebricksException {
@@ -888,13 +888,13 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
         return forwardOperator;
     }
 
-    private boolean allAreOrderProps(List<ILocalStructuralProperty> cldLocals) {
-        for (ILocalStructuralProperty lsp : cldLocals) {
-            if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {
+    private boolean allAreOrderProps(List<ILocalStructuralProperty> childLocalProperties) {
+        for (ILocalStructuralProperty childLocalProperty : childLocalProperties) {
+            if (childLocalProperty.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {
                 return false;
             }
         }
-        return !cldLocals.isEmpty();
+        return !childLocalProperties.isEmpty();
     }
 
     private void printOp(AbstractLogicalOperator op) throws AlgebricksException {
@@ -927,7 +927,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
             throws AlgebricksException {
         ILogicalOperator oldOp = opRef.getValue();
         opRef.setValue(newOp);
-        newOp.getInputs().add(new MutableObject<ILogicalOperator>(oldOp));
+        newOp.getInputs().add(new MutableObject<>(oldOp));
         newOp.recomputeSchema();
         newOp.computeDeliveredPhysicalProperties(context);
         context.computeAndSetTypeEnvironmentForOperator(newOp);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/65447469/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
index c437619..0b1c0a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
@@ -69,7 +69,7 @@ public class DeterministicPartitionBatchManager implements IPartitionBatchManage
         }
     }
 
-    private synchronized boolean allPartitionsAdded() {
+    private boolean allPartitionsAdded() {
         for (int i = 0; i < partitions.length; i++) {
             if (partitions[i] == null) {
                 return false;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/65447469/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
index 2646c94..6aa305b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameReader;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+// TODO(ali): consider sort-concat-merge as an alternative.
 public class SequentialMergeFrameReader implements IFrameReader {
     private final int numSenders;
     private final IPartitionBatchManager partitionBatchManager;