You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/07/14 17:13:31 UTC
asterixdb git commit: Rename join operator to be consistent with
other join classes.
Repository: asterixdb
Updated Branches:
refs/heads/master 5b2d4c89c -> c1f984e65
Rename join operator to be consistent with other join classes.
Change-Id: Ib38dba95243e894a2b1950de60ac7ab53ba007ca
Reviewed-on: https://asterix-gerrit.ics.uci.edu/994
Reviewed-by: Yingyi Bu <bu...@gmail.com>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/c1f984e6
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/c1f984e6
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/c1f984e6
Branch: refs/heads/master
Commit: c1f984e655221bfe7bd444f9f39c3c777cf9c2cf
Parents: 5b2d4c8
Author: Preston Carman <pr...@apache.org>
Authored: Thu Jul 14 09:18:13 2016 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Thu Jul 14 10:12:57 2016 -0700
----------------------------------------------------------------------
.../operators/physical/NLJoinPOperator.java | 295 -------------------
.../physical/NestedLoopJoinPOperator.java | 295 +++++++++++++++++++
.../algebricks/rewriter/util/JoinUtils.java | 8 +-
3 files changed, 299 insertions(+), 299 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c1f984e6/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
deleted file mode 100644
index 5384347..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
-
-/**
- * The right input is broadcast and the left input can be partitioned in any way.
- */
-public class NLJoinPOperator extends AbstractJoinPOperator {
-
- private final int memSize;
-
- public NLJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) {
- super(kind, partitioningType);
- this.memSize = memSize;
- }
-
- @Override
- public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.NESTED_LOOP;
- }
-
- @Override
- public boolean isMicroOperator() {
- return false;
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
- if (partitioningType != JoinPartitioningType.BROADCAST) {
- throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
- }
-
- IPartitioningProperty pp;
-
- AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
-
- if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue();
- IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties();
- if (pv1 == null) {
- pp = null;
- } else {
- pp = pv1.getPartitioningProperty();
- }
- } else {
- pp = IPartitioningProperty.UNPARTITIONED;
- }
-
- // Nested loop join cannot maintain the local structure property for the probe side
- // because of the I/O optimization for the build branch.
- this.deliveredProperties = new StructuralPropertiesVector(pp, null);
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
- if (partitioningType != JoinPartitioningType.BROADCAST) {
- throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
- }
-
- StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
-
- // TODO: leverage statistics to make better decisions.
- pv[0] = new StructuralPropertiesVector(null, null);
- pv[1] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(context.getComputationNodeDomain()),
- null);
- return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
- }
-
- @Override
- public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
- RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
- propagatedSchema, context);
- IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
- conditionInputSchemas[0] = propagatedSchema;
- IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
- IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
- context.getTypeEnvironment(op), conditionInputSchemas, context);
- ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond,
- context.getBinaryBooleanInspectorFactory());
- IOperatorDescriptorRegistry spec = builder.getJobSpec();
- IOperatorDescriptor opDesc = null;
-
- switch (kind) {
- case INNER: {
- opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
- null);
- break;
- }
- case LEFT_OUTER: {
- IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()];
- for (int j = 0; j < nonMatchWriterFactories.length; j++) {
- nonMatchWriterFactories[j] = context.getMissingWriterFactory();
- }
- opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
- nonMatchWriterFactories);
- break;
- }
- default: {
- throw new NotImplementedException();
- }
- }
- contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
-
- ILogicalOperator src1 = op.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src1, 0, op, 0);
- ILogicalOperator src2 = op.getInputs().get(1).getValue();
- builder.contributeGraphEdge(src2, 0, op, 1);
- }
-
- public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory {
-
- private static final long serialVersionUID = 1L;
- private final IScalarEvaluatorFactory cond;
- private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
-
- public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond,
- IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
- this.cond = cond;
- this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
- }
-
- @Override
- public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
- return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx));
- }
- }
-
- public static class TuplePairEvaluator implements ITuplePairComparator {
- private final IHyracksTaskContext ctx;
- private IScalarEvaluator condEvaluator;
- private final IScalarEvaluatorFactory condFactory;
- private final IPointable p;
- private final CompositeFrameTupleReference compositeTupleRef;
- private final FrameTupleReference leftRef;
- private final FrameTupleReference rightRef;
- private final IBinaryBooleanInspector binaryBooleanInspector;
-
- public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
- IBinaryBooleanInspector binaryBooleanInspector) {
- this.ctx = ctx;
- this.condFactory = condFactory;
- this.binaryBooleanInspector = binaryBooleanInspector;
- this.leftRef = new FrameTupleReference();
- this.p = VoidPointable.FACTORY.createPointable();
- this.rightRef = new FrameTupleReference();
- this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef);
- }
-
- @Override
- public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
- int innerIndex) throws HyracksDataException {
- if (condEvaluator == null) {
- try {
- this.condEvaluator = condFactory.createScalarEvaluator(ctx);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex);
- try {
- condEvaluator.evaluate(compositeTupleRef, p);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
- p.getLength());
- if (result) {
- return 0;
- } else {
- return 1;
- }
- }
- }
-
- public static class CompositeFrameTupleReference implements IFrameTupleReference {
-
- private final FrameTupleReference refLeft;
- private final FrameTupleReference refRight;
-
- public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) {
- this.refLeft = refLeft;
- this.refRight = refRight;
- }
-
- public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
- int innerIndex) {
- refLeft.reset(outerAccessor, outerIndex);
- refRight.reset(innerAccessor, innerIndex);
- }
-
- @Override
- public int getFieldCount() {
- return refLeft.getFieldCount() + refRight.getFieldCount();
- }
-
- @Override
- public byte[] getFieldData(int fIdx) {
- int leftFieldCount = refLeft.getFieldCount();
- if (fIdx < leftFieldCount) {
- return refLeft.getFieldData(fIdx);
- } else {
- return refRight.getFieldData(fIdx - leftFieldCount);
- }
- }
-
- @Override
- public int getFieldStart(int fIdx) {
- int leftFieldCount = refLeft.getFieldCount();
- if (fIdx < leftFieldCount) {
- return refLeft.getFieldStart(fIdx);
- } else {
- return refRight.getFieldStart(fIdx - leftFieldCount);
- }
- }
-
- @Override
- public int getFieldLength(int fIdx) {
- int leftFieldCount = refLeft.getFieldCount();
- if (fIdx < leftFieldCount) {
- return refLeft.getFieldLength(fIdx);
- } else {
- return refRight.getFieldLength(fIdx - leftFieldCount);
- }
- }
-
- @Override
- public IFrameTupleAccessor getFrameTupleAccessor() {
- throw new NotImplementedException();
- }
-
- @Override
- public int getTupleIndex() {
- throw new NotImplementedException();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c1f984e6/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
new file mode 100644
index 0000000..e9f9db0
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
+import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+
+/**
+ * The right input is broadcast and the left input can be partitioned in any way.
+ */
+public class NestedLoopJoinPOperator extends AbstractJoinPOperator {
+
+ private final int memSize;
+
+ public NestedLoopJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) {
+ super(kind, partitioningType);
+ this.memSize = memSize;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.NESTED_LOOP;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
+ if (partitioningType != JoinPartitioningType.BROADCAST) {
+ throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
+ }
+
+ IPartitioningProperty pp;
+
+ AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
+
+ if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue();
+ IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties();
+ if (pv1 == null) {
+ pp = null;
+ } else {
+ pp = pv1.getPartitioningProperty();
+ }
+ } else {
+ pp = IPartitioningProperty.UNPARTITIONED;
+ }
+
+ // Nested loop join cannot maintain the local structure property for the probe side
+ // because of the I/O optimization for the build branch.
+ this.deliveredProperties = new StructuralPropertiesVector(pp, null);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ if (partitioningType != JoinPartitioningType.BROADCAST) {
+ throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
+ }
+
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
+
+ // TODO: leverage statistics to make better decisions.
+ pv[0] = new StructuralPropertiesVector(null, null);
+ pv[1] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(context.getComputationNodeDomain()),
+ null);
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
+ IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
+ conditionInputSchemas[0] = propagatedSchema;
+ IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+ IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
+ context.getTypeEnvironment(op), conditionInputSchemas, context);
+ ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond,
+ context.getBinaryBooleanInspectorFactory());
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ IOperatorDescriptor opDesc = null;
+
+ switch (kind) {
+ case INNER: {
+ opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
+ null);
+ break;
+ }
+ case LEFT_OUTER: {
+ IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()];
+ for (int j = 0; j < nonMatchWriterFactories.length; j++) {
+ nonMatchWriterFactories[j] = context.getMissingWriterFactory();
+ }
+ opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
+ nonMatchWriterFactories);
+ break;
+ }
+ default: {
+ throw new NotImplementedException();
+ }
+ }
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+ ILogicalOperator src1 = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src1, 0, op, 0);
+ ILogicalOperator src2 = op.getInputs().get(1).getValue();
+ builder.contributeGraphEdge(src2, 0, op, 1);
+ }
+
+ public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final IScalarEvaluatorFactory cond;
+ private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+
+ public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond,
+ IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
+ this.cond = cond;
+ this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
+ }
+
+ @Override
+ public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+ return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx));
+ }
+ }
+
+ public static class TuplePairEvaluator implements ITuplePairComparator {
+ private final IHyracksTaskContext ctx;
+ private IScalarEvaluator condEvaluator;
+ private final IScalarEvaluatorFactory condFactory;
+ private final IPointable p;
+ private final CompositeFrameTupleReference compositeTupleRef;
+ private final FrameTupleReference leftRef;
+ private final FrameTupleReference rightRef;
+ private final IBinaryBooleanInspector binaryBooleanInspector;
+
+ public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
+ IBinaryBooleanInspector binaryBooleanInspector) {
+ this.ctx = ctx;
+ this.condFactory = condFactory;
+ this.binaryBooleanInspector = binaryBooleanInspector;
+ this.leftRef = new FrameTupleReference();
+ this.p = VoidPointable.FACTORY.createPointable();
+ this.rightRef = new FrameTupleReference();
+ this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef);
+ }
+
+ @Override
+ public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
+ int innerIndex) throws HyracksDataException {
+ if (condEvaluator == null) {
+ try {
+ this.condEvaluator = condFactory.createScalarEvaluator(ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex);
+ try {
+ condEvaluator.evaluate(compositeTupleRef, p);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
+ p.getLength());
+ if (result) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+ }
+
+ public static class CompositeFrameTupleReference implements IFrameTupleReference {
+
+ private final FrameTupleReference refLeft;
+ private final FrameTupleReference refRight;
+
+ public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) {
+ this.refLeft = refLeft;
+ this.refRight = refRight;
+ }
+
+ public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
+ int innerIndex) {
+ refLeft.reset(outerAccessor, outerIndex);
+ refRight.reset(innerAccessor, innerIndex);
+ }
+
+ @Override
+ public int getFieldCount() {
+ return refLeft.getFieldCount() + refRight.getFieldCount();
+ }
+
+ @Override
+ public byte[] getFieldData(int fIdx) {
+ int leftFieldCount = refLeft.getFieldCount();
+ if (fIdx < leftFieldCount) {
+ return refLeft.getFieldData(fIdx);
+ } else {
+ return refRight.getFieldData(fIdx - leftFieldCount);
+ }
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ int leftFieldCount = refLeft.getFieldCount();
+ if (fIdx < leftFieldCount) {
+ return refLeft.getFieldStart(fIdx);
+ } else {
+ return refRight.getFieldStart(fIdx - leftFieldCount);
+ }
+ }
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ int leftFieldCount = refLeft.getFieldCount();
+ if (fIdx < leftFieldCount) {
+ return refLeft.getFieldLength(fIdx);
+ } else {
+ return refRight.getFieldLength(fIdx - leftFieldCount);
+ }
+ }
+
+ @Override
+ public IFrameTupleAccessor getFrameTupleAccessor() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int getTupleIndex() {
+ throw new NotImplementedException();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c1f984e6/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 56ea55e..3332836 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -44,7 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.Log
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HybridHashJoinPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryHashJoinPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.NLJoinPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
@@ -80,12 +80,12 @@ public class JoinUtils {
}
}
} else {
- setNLJoinOp(op, context);
+ setNestedLoopJoinOp(op, context);
}
}
- private static void setNLJoinOp(AbstractBinaryJoinOperator op, IOptimizationContext context) {
- op.setPhysicalOperator(new NLJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
+ private static void setNestedLoopJoinOp(AbstractBinaryJoinOperator op, IOptimizationContext context) {
+ op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
context.getPhysicalOptimizationConfig().getMaxFramesForJoin()));
}