You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/07/14 17:13:31 UTC

asterixdb git commit: Rename join operator to be consistent with other join classes.

Repository: asterixdb
Updated Branches:
  refs/heads/master 5b2d4c89c -> c1f984e65


Rename join operator to be consistent with other join classes.

Change-Id: Ib38dba95243e894a2b1950de60ac7ab53ba007ca
Reviewed-on: https://asterix-gerrit.ics.uci.edu/994
Reviewed-by: Yingyi Bu <bu...@gmail.com>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/c1f984e6
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/c1f984e6
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/c1f984e6

Branch: refs/heads/master
Commit: c1f984e655221bfe7bd444f9f39c3c777cf9c2cf
Parents: 5b2d4c8
Author: Preston Carman <pr...@apache.org>
Authored: Thu Jul 14 09:18:13 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Thu Jul 14 10:12:57 2016 -0700

----------------------------------------------------------------------
 .../operators/physical/NLJoinPOperator.java     | 295 -------------------
 .../physical/NestedLoopJoinPOperator.java       | 295 +++++++++++++++++++
 .../algebricks/rewriter/util/JoinUtils.java     |   8 +-
 3 files changed, 299 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c1f984e6/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
deleted file mode 100644
index 5384347..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
-
-/**
- * The right input is broadcast and the left input can be partitioned in any way.
- */
-public class NLJoinPOperator extends AbstractJoinPOperator {
-
-    private final int memSize;
-
-    public NLJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) {
-        super(kind, partitioningType);
-        this.memSize = memSize;
-    }
-
-    @Override
-    public PhysicalOperatorTag getOperatorTag() {
-        return PhysicalOperatorTag.NESTED_LOOP;
-    }
-
-    @Override
-    public boolean isMicroOperator() {
-        return false;
-    }
-
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
-        if (partitioningType != JoinPartitioningType.BROADCAST) {
-            throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
-        }
-
-        IPartitioningProperty pp;
-
-        AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
-
-        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
-            AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue();
-            IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties();
-            if (pv1 == null) {
-                pp = null;
-            } else {
-                pp = pv1.getPartitioningProperty();
-            }
-        } else {
-            pp = IPartitioningProperty.UNPARTITIONED;
-        }
-
-        // Nested loop join cannot maintain the local structure property for the probe side
-        // because of the I/O optimization for the build branch.
-        this.deliveredProperties = new StructuralPropertiesVector(pp, null);
-    }
-
-    @Override
-    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
-        if (partitioningType != JoinPartitioningType.BROADCAST) {
-            throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
-        }
-
-        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
-
-        // TODO: leverage statistics to make better decisions.
-        pv[0] = new StructuralPropertiesVector(null, null);
-        pv[1] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(context.getComputationNodeDomain()),
-                null);
-        return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
-    }
-
-    @Override
-    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
-        AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
-        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
-                propagatedSchema, context);
-        IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
-        conditionInputSchemas[0] = propagatedSchema;
-        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
-        IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
-                context.getTypeEnvironment(op), conditionInputSchemas, context);
-        ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond,
-                context.getBinaryBooleanInspectorFactory());
-        IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        IOperatorDescriptor opDesc = null;
-
-        switch (kind) {
-            case INNER: {
-                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
-                        null);
-                break;
-            }
-            case LEFT_OUTER: {
-                IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()];
-                for (int j = 0; j < nonMatchWriterFactories.length; j++) {
-                    nonMatchWriterFactories[j] = context.getMissingWriterFactory();
-                }
-                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
-                        nonMatchWriterFactories);
-                break;
-            }
-            default: {
-                throw new NotImplementedException();
-            }
-        }
-        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
-
-        ILogicalOperator src1 = op.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src1, 0, op, 0);
-        ILogicalOperator src2 = op.getInputs().get(1).getValue();
-        builder.contributeGraphEdge(src2, 0, op, 1);
-    }
-
-    public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory {
-
-        private static final long serialVersionUID = 1L;
-        private final IScalarEvaluatorFactory cond;
-        private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
-
-        public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond,
-                IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
-            this.cond = cond;
-            this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
-        }
-
-        @Override
-        public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
-            return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx));
-        }
-    }
-
-    public static class TuplePairEvaluator implements ITuplePairComparator {
-        private final IHyracksTaskContext ctx;
-        private IScalarEvaluator condEvaluator;
-        private final IScalarEvaluatorFactory condFactory;
-        private final IPointable p;
-        private final CompositeFrameTupleReference compositeTupleRef;
-        private final FrameTupleReference leftRef;
-        private final FrameTupleReference rightRef;
-        private final IBinaryBooleanInspector binaryBooleanInspector;
-
-        public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
-                IBinaryBooleanInspector binaryBooleanInspector) {
-            this.ctx = ctx;
-            this.condFactory = condFactory;
-            this.binaryBooleanInspector = binaryBooleanInspector;
-            this.leftRef = new FrameTupleReference();
-            this.p = VoidPointable.FACTORY.createPointable();
-            this.rightRef = new FrameTupleReference();
-            this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef);
-        }
-
-        @Override
-        public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
-                int innerIndex) throws HyracksDataException {
-            if (condEvaluator == null) {
-                try {
-                    this.condEvaluator = condFactory.createScalarEvaluator(ctx);
-                } catch (AlgebricksException ae) {
-                    throw new HyracksDataException(ae);
-                }
-            }
-            compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex);
-            try {
-                condEvaluator.evaluate(compositeTupleRef, p);
-            } catch (AlgebricksException ae) {
-                throw new HyracksDataException(ae);
-            }
-            boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
-                    p.getLength());
-            if (result) {
-                return 0;
-            } else {
-                return 1;
-            }
-        }
-    }
-
-    public static class CompositeFrameTupleReference implements IFrameTupleReference {
-
-        private final FrameTupleReference refLeft;
-        private final FrameTupleReference refRight;
-
-        public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) {
-            this.refLeft = refLeft;
-            this.refRight = refRight;
-        }
-
-        public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
-                int innerIndex) {
-            refLeft.reset(outerAccessor, outerIndex);
-            refRight.reset(innerAccessor, innerIndex);
-        }
-
-        @Override
-        public int getFieldCount() {
-            return refLeft.getFieldCount() + refRight.getFieldCount();
-        }
-
-        @Override
-        public byte[] getFieldData(int fIdx) {
-            int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount) {
-                return refLeft.getFieldData(fIdx);
-            } else {
-                return refRight.getFieldData(fIdx - leftFieldCount);
-            }
-        }
-
-        @Override
-        public int getFieldStart(int fIdx) {
-            int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount) {
-                return refLeft.getFieldStart(fIdx);
-            } else {
-                return refRight.getFieldStart(fIdx - leftFieldCount);
-            }
-        }
-
-        @Override
-        public int getFieldLength(int fIdx) {
-            int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount) {
-                return refLeft.getFieldLength(fIdx);
-            } else {
-                return refRight.getFieldLength(fIdx - leftFieldCount);
-            }
-        }
-
-        @Override
-        public IFrameTupleAccessor getFrameTupleAccessor() {
-            throw new NotImplementedException();
-        }
-
-        @Override
-        public int getTupleIndex() {
-            throw new NotImplementedException();
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c1f984e6/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
new file mode 100644
index 0000000..e9f9db0
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
+import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+
+/**
+ * The right input is broadcast and the left input can be partitioned in any way.
+ */
+public class NestedLoopJoinPOperator extends AbstractJoinPOperator {
+
+    private final int memSize;
+
+    public NestedLoopJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) {
+        super(kind, partitioningType);
+        this.memSize = memSize;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.NESTED_LOOP;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
+        if (partitioningType != JoinPartitioningType.BROADCAST) {
+            throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
+        }
+
+        IPartitioningProperty pp;
+
+        AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
+
+        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+            AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue();
+            IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties();
+            if (pv1 == null) {
+                pp = null;
+            } else {
+                pp = pv1.getPartitioningProperty();
+            }
+        } else {
+            pp = IPartitioningProperty.UNPARTITIONED;
+        }
+
+        // Nested loop join cannot maintain the local structure property for the probe side
+        // because of the I/O optimization for the build branch.
+        this.deliveredProperties = new StructuralPropertiesVector(pp, null);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+        if (partitioningType != JoinPartitioningType.BROADCAST) {
+            throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
+        }
+
+        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
+
+        // TODO: leverage statistics to make better decisions.
+        pv[0] = new StructuralPropertiesVector(null, null);
+        pv[1] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(context.getComputationNodeDomain()),
+                null);
+        return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
+        IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
+        conditionInputSchemas[0] = propagatedSchema;
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
+                context.getTypeEnvironment(op), conditionInputSchemas, context);
+        ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond,
+                context.getBinaryBooleanInspectorFactory());
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        IOperatorDescriptor opDesc = null;
+
+        switch (kind) {
+            case INNER: {
+                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
+                        null);
+                break;
+            }
+            case LEFT_OUTER: {
+                IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()];
+                for (int j = 0; j < nonMatchWriterFactories.length; j++) {
+                    nonMatchWriterFactories[j] = context.getMissingWriterFactory();
+                }
+                opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
+                        nonMatchWriterFactories);
+                break;
+            }
+            default: {
+                throw new NotImplementedException();
+            }
+        }
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        ILogicalOperator src1 = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src1, 0, op, 0);
+        ILogicalOperator src2 = op.getInputs().get(1).getValue();
+        builder.contributeGraphEdge(src2, 0, op, 1);
+    }
+
+    public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory {
+
+        private static final long serialVersionUID = 1L;
+        private final IScalarEvaluatorFactory cond;
+        private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+
+        public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond,
+                IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
+            this.cond = cond;
+            this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
+        }
+
+        @Override
+        public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+            return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx));
+        }
+    }
+
+    public static class TuplePairEvaluator implements ITuplePairComparator {
+        private final IHyracksTaskContext ctx;
+        private IScalarEvaluator condEvaluator;
+        private final IScalarEvaluatorFactory condFactory;
+        private final IPointable p;
+        private final CompositeFrameTupleReference compositeTupleRef;
+        private final FrameTupleReference leftRef;
+        private final FrameTupleReference rightRef;
+        private final IBinaryBooleanInspector binaryBooleanInspector;
+
+        public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
+                IBinaryBooleanInspector binaryBooleanInspector) {
+            this.ctx = ctx;
+            this.condFactory = condFactory;
+            this.binaryBooleanInspector = binaryBooleanInspector;
+            this.leftRef = new FrameTupleReference();
+            this.p = VoidPointable.FACTORY.createPointable();
+            this.rightRef = new FrameTupleReference();
+            this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef);
+        }
+
+        @Override
+        public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
+                int innerIndex) throws HyracksDataException {
+            if (condEvaluator == null) {
+                try {
+                    this.condEvaluator = condFactory.createScalarEvaluator(ctx);
+                } catch (AlgebricksException ae) {
+                    throw new HyracksDataException(ae);
+                }
+            }
+            compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex);
+            try {
+                condEvaluator.evaluate(compositeTupleRef, p);
+            } catch (AlgebricksException ae) {
+                throw new HyracksDataException(ae);
+            }
+            boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
+                    p.getLength());
+            if (result) {
+                return 0;
+            } else {
+                return 1;
+            }
+        }
+    }
+
+    public static class CompositeFrameTupleReference implements IFrameTupleReference {
+
+        private final FrameTupleReference refLeft;
+        private final FrameTupleReference refRight;
+
+        public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) {
+            this.refLeft = refLeft;
+            this.refRight = refRight;
+        }
+
+        public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
+                int innerIndex) {
+            refLeft.reset(outerAccessor, outerIndex);
+            refRight.reset(innerAccessor, innerIndex);
+        }
+
+        @Override
+        public int getFieldCount() {
+            return refLeft.getFieldCount() + refRight.getFieldCount();
+        }
+
+        @Override
+        public byte[] getFieldData(int fIdx) {
+            int leftFieldCount = refLeft.getFieldCount();
+            if (fIdx < leftFieldCount) {
+                return refLeft.getFieldData(fIdx);
+            } else {
+                return refRight.getFieldData(fIdx - leftFieldCount);
+            }
+        }
+
+        @Override
+        public int getFieldStart(int fIdx) {
+            int leftFieldCount = refLeft.getFieldCount();
+            if (fIdx < leftFieldCount) {
+                return refLeft.getFieldStart(fIdx);
+            } else {
+                return refRight.getFieldStart(fIdx - leftFieldCount);
+            }
+        }
+
+        @Override
+        public int getFieldLength(int fIdx) {
+            int leftFieldCount = refLeft.getFieldCount();
+            if (fIdx < leftFieldCount) {
+                return refLeft.getFieldLength(fIdx);
+            } else {
+                return refRight.getFieldLength(fIdx - leftFieldCount);
+            }
+        }
+
+        @Override
+        public IFrameTupleAccessor getFrameTupleAccessor() {
+            throw new NotImplementedException();
+        }
+
+        @Override
+        public int getTupleIndex() {
+            throw new NotImplementedException();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c1f984e6/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 56ea55e..3332836 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -44,7 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.Log
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HybridHashJoinPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryHashJoinPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.NLJoinPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 
@@ -80,12 +80,12 @@ public class JoinUtils {
                 }
             }
         } else {
-            setNLJoinOp(op, context);
+            setNestedLoopJoinOp(op, context);
         }
     }
 
-    private static void setNLJoinOp(AbstractBinaryJoinOperator op, IOptimizationContext context) {
-        op.setPhysicalOperator(new NLJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
+    private static void setNestedLoopJoinOp(AbstractBinaryJoinOperator op, IOptimizationContext context) {
+        op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
                 context.getPhysicalOptimizationConfig().getMaxFramesForJoin()));
     }