You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:54:52 UTC

[02/50] [abbrv] asterixdb git commit: snapshot super interval dag.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
new file mode 100644
index 0000000..e9f9db0
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
+import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+
+/**
+ * The right input is broadcast and the left input can be partitioned in any way.
+ */
+public class NestedLoopJoinPOperator extends AbstractJoinPOperator {
+
+    private final int memSize;
+
+    public NestedLoopJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) {
+        super(kind, partitioningType);
+        this.memSize = memSize;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.NESTED_LOOP;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
+        if (partitioningType != JoinPartitioningType.BROADCAST) {
+            throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
+        }
+
+        IPartitioningProperty pp;
+
+        AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
+
+        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+            AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue();
+            IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties();
+            if (pv1 == null) {
+                pp = null;
+            } else {
+                pp = pv1.getPartitioningProperty();
+            }
+        } else {
+            pp = IPartitioningProperty.UNPARTITIONED;
+        }
+
+        // Nested loop join cannot maintain the local structure property for the probe side
+        // because of the I/O optimization for the build branch.
+        this.deliveredProperties = new StructuralPropertiesVector(pp, null);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+        if (partitioningType != JoinPartitioningType.BROADCAST) {
+            throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
+        }
+
+        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
+
+        // TODO: leverage statistics to make better decisions.
+        pv[0] = new StructuralPropertiesVector(null, null);
+        pv[1] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(context.getComputationNodeDomain()),
+                null);
+        return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
+        IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
+        conditionInputSchemas[0] = propagatedSchema;
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
+                context.getTypeEnvironment(op), conditionInputSchemas, context);
+        ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond,
+                context.getBinaryBooleanInspectorFactory());
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        IOperatorDescriptor opDesc = null;
+
+        switch (kind) {
+            case INNER: {
+                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
+                        null);
+                break;
+            }
+            case LEFT_OUTER: {
+                IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()];
+                for (int j = 0; j < nonMatchWriterFactories.length; j++) {
+                    nonMatchWriterFactories[j] = context.getMissingWriterFactory();
+                }
+                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
+                        nonMatchWriterFactories);
+                break;
+            }
+            default: {
+                throw new NotImplementedException();
+            }
+        }
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        ILogicalOperator src1 = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src1, 0, op, 0);
+        ILogicalOperator src2 = op.getInputs().get(1).getValue();
+        builder.contributeGraphEdge(src2, 0, op, 1);
+    }
+
+    public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory {
+
+        private static final long serialVersionUID = 1L;
+        private final IScalarEvaluatorFactory cond;
+        private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+
+        public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond,
+                IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
+            this.cond = cond;
+            this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
+        }
+
+        @Override
+        public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+            return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx));
+        }
+    }
+
+    public static class TuplePairEvaluator implements ITuplePairComparator {
+        private final IHyracksTaskContext ctx;
+        private IScalarEvaluator condEvaluator;
+        private final IScalarEvaluatorFactory condFactory;
+        private final IPointable p;
+        private final CompositeFrameTupleReference compositeTupleRef;
+        private final FrameTupleReference leftRef;
+        private final FrameTupleReference rightRef;
+        private final IBinaryBooleanInspector binaryBooleanInspector;
+
+        public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
+                IBinaryBooleanInspector binaryBooleanInspector) {
+            this.ctx = ctx;
+            this.condFactory = condFactory;
+            this.binaryBooleanInspector = binaryBooleanInspector;
+            this.leftRef = new FrameTupleReference();
+            this.p = VoidPointable.FACTORY.createPointable();
+            this.rightRef = new FrameTupleReference();
+            this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef);
+        }
+
+        @Override
+        public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
+                int innerIndex) throws HyracksDataException {
+            if (condEvaluator == null) {
+                try {
+                    this.condEvaluator = condFactory.createScalarEvaluator(ctx);
+                } catch (AlgebricksException ae) {
+                    throw new HyracksDataException(ae);
+                }
+            }
+            compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex);
+            try {
+                condEvaluator.evaluate(compositeTupleRef, p);
+            } catch (AlgebricksException ae) {
+                throw new HyracksDataException(ae);
+            }
+            boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
+                    p.getLength());
+            if (result) {
+                return 0;
+            } else {
+                return 1;
+            }
+        }
+    }
+
+    public static class CompositeFrameTupleReference implements IFrameTupleReference {
+
+        private final FrameTupleReference refLeft;
+        private final FrameTupleReference refRight;
+
+        public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) {
+            this.refLeft = refLeft;
+            this.refRight = refRight;
+        }
+
+        public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
+                int innerIndex) {
+            refLeft.reset(outerAccessor, outerIndex);
+            refRight.reset(innerAccessor, innerIndex);
+        }
+
+        @Override
+        public int getFieldCount() {
+            return refLeft.getFieldCount() + refRight.getFieldCount();
+        }
+
+        @Override
+        public byte[] getFieldData(int fIdx) {
+            int leftFieldCount = refLeft.getFieldCount();
+            if (fIdx < leftFieldCount) {
+                return refLeft.getFieldData(fIdx);
+            } else {
+                return refRight.getFieldData(fIdx - leftFieldCount);
+            }
+        }
+
+        @Override
+        public int getFieldStart(int fIdx) {
+            int leftFieldCount = refLeft.getFieldCount();
+            if (fIdx < leftFieldCount) {
+                return refLeft.getFieldStart(fIdx);
+            } else {
+                return refRight.getFieldStart(fIdx - leftFieldCount);
+            }
+        }
+
+        @Override
+        public int getFieldLength(int fIdx) {
+            int leftFieldCount = refLeft.getFieldCount();
+            if (fIdx < leftFieldCount) {
+                return refLeft.getFieldLength(fIdx);
+            } else {
+                return refRight.getFieldLength(fIdx - leftFieldCount);
+            }
+        }
+
+        @Override
+        public IFrameTupleAccessor getFrameTupleAccessor() {
+            throw new NotImplementedException();
+        }
+
+        @Override
+        public int getTupleIndex() {
+            throw new NotImplementedException();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index d6bd554..8299f61 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -79,6 +79,14 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
         return partitioningFields;
     }
 
+    public RangePartitioningType getRangeType() {
+        return rangeType;
+    }
+
+    public IRangeMap getRangeMap() {
+        return rangeMap;
+    }
+
     public INodeDomain getDomain() {
         return domain;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
index ef37d8c..9667427 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
@@ -82,6 +82,14 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera
         return partitioningFields;
     }
 
+    public RangePartitioningType getRangeType() {
+        return rangeType;
+    }
+
+    public IRangeMap getRangeMap() {
+        return rangeMap;
+    }
+
     public INodeDomain getDomain() {
         return domain;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index 14a8f16..db778f7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -89,7 +89,7 @@ public class ReplicatePOperator extends AbstractPhysicalOperator {
                 outputDependencyLabels[i] = 1;
             }
         }
-        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+        return new Pair<>(inputDependencyLabels, outputDependencyLabels);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index e6517d0..823294e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -21,15 +21,12 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 import java.util.ArrayList;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java
index fee9174..9d2a5da 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java
@@ -36,9 +36,9 @@ public class PropagatingTypeEnvironment extends AbstractTypeEnvironment {
 
     private final ITypeEnvPointer[] envPointers;
 
-    private final List<LogicalVariable> nonNullVariables = new ArrayList<LogicalVariable>();
+    private final List<LogicalVariable> nonNullVariables = new ArrayList<>();
 
-    private final List<List<LogicalVariable>> correlatedNullableVariableLists = new ArrayList<List<LogicalVariable>>();
+    private final List<List<LogicalVariable>> correlatedNullableVariableLists = new ArrayList<>();
 
     public PropagatingTypeEnvironment(IExpressionTypeComputer expressionTypeComputer,
             IMissableTypeComputer nullableTypeComputer, IMetadataProvider<?, ?> metadataProvider,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 02e4c8a..45f1b76 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -44,7 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.Log
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HybridHashJoinPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryHashJoinPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.NLJoinPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 
@@ -83,8 +83,8 @@ public class JoinUtils {
     }
 
     private static void setNLJoinOp(AbstractBinaryJoinOperator op, IOptimizationContext context) {
-        op.setPhysicalOperator(new NLJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, context
-                .getPhysicalOptimizationConfig().getMaxRecordsPerFrame()));
+        op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, context
+                .getPhysicalOptimizationConfig().getMaxFramesForJoin()));
     }
 
     private static void setHashJoinOp(AbstractBinaryJoinOperator op, JoinPartitioningType partitioningType,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 9ce59ae..5290af4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -140,9 +140,9 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
 
                 @Override
                 public void flush() throws HyracksDataException {
-                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
-                        writers[i].flush();
-                    }
+//                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+//                        writers[i].flush();
+//                    }
                 }
 
                 @Override