You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:41:39 UTC
[26/51] [partial] incubator-asterixdb-hyracks git commit: Change
folder structure for Java repackage
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
new file mode 100644
index 0000000..3abdec9
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
+
+public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
+
+ private final int memSizeInFrames;
+ private final int maxInputBuildSizeInFrames;
+ private final int aveRecordsPerFrame;
+ private final double fudgeFactor;
+
+ private static final Logger LOGGER = Logger.getLogger(HybridHashJoinPOperator.class.getName());
+
+ public HybridHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
+ List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
+ int memSizeInFrames, int maxInputSizeInFrames, int aveRecordsPerFrame, double fudgeFactor) {
+ super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
+ this.memSizeInFrames = memSizeInFrames;
+ this.maxInputBuildSizeInFrames = maxInputSizeInFrames;
+ this.aveRecordsPerFrame = aveRecordsPerFrame;
+ this.fudgeFactor = fudgeFactor;
+
+ LOGGER.fine("HybridHashJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType="
+ + partitioningType + ", List<LogicalVariable>=" + sideLeftOfEqualities + ", List<LogicalVariable>="
+ + sideRightOfEqualities + ", int memSizeInFrames=" + memSizeInFrames + ", int maxInputSize0InFrames="
+ + maxInputSizeInFrames + ", int aveRecordsPerFrame=" + aveRecordsPerFrame + ", double fudgeFactor="
+ + fudgeFactor + ".");
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.HYBRID_HASH_JOIN;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ public double getFudgeFactor() {
+ return fudgeFactor;
+ }
+
+ public int getMemSizeInFrames() {
+ return memSizeInFrames;
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
+ int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+ IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
+ keysLeftBranch, env, context);
+ IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(
+ keysLeftBranch, env, context);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
+ int i = 0;
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ for (LogicalVariable v : keysLeftBranch) {
+ Object t = env.getVarType(v);
+ comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
+ }
+
+ IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context
+ .getPredicateEvaluatorFactoryProvider();
+ IPredicateEvaluatorFactory predEvaluatorFactory = (predEvaluatorFactoryProvider == null ? null
+ : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
+
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ IOperatorDescriptor opDesc = null;
+
+ boolean optimizedHashJoin = true;
+ for (IBinaryHashFunctionFamily family : hashFunFamilies) {
+ if (family == null) {
+ optimizedHashJoin = false;
+ break;
+ }
+ }
+
+ if (!optimizedHashJoin) {
+ try {
+ switch (kind) {
+ case INNER: {
+ opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+ hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory);
+ break;
+ }
+ case LEFT_OUTER: {
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+ hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory, true,
+ nullWriterFactories);
+ break;
+ }
+ default: {
+ throw new NotImplementedException();
+ }
+ }
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+ } else {
+ try {
+ switch (kind) {
+ case INNER: {
+ opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+ comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+ keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+ keysRight, keysLeft), predEvaluatorFactory);
+ break;
+ }
+ case LEFT_OUTER: {
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+ comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+ keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+ keysRight, keysLeft), predEvaluatorFactory, true, nullWriterFactories);
+ break;
+ }
+ default: {
+ throw new NotImplementedException();
+ }
+ }
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+ ILogicalOperator src1 = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src1, 0, op, 0);
+ ILogicalOperator src2 = op.getInputs().get(1).getValue();
+ builder.contributeGraphEdge(src2, 0, op, 1);
+ }
+
+ @Override
+ protected List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ return new LinkedList<ILocalStructuralProperty>();
+ }
+
+}
+
+/**
+ * {@ ITuplePairComparatorFactory} implementation for optimized hybrid hash join.
+ */
+class JoinMultiComparatorFactory implements ITuplePairComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final IBinaryComparatorFactory[] binaryComparatorFactories;
+ private final int[] keysLeft;
+ private final int[] keysRight;
+
+ public JoinMultiComparatorFactory(IBinaryComparatorFactory[] binaryComparatorFactory, int[] keysLeft,
+ int[] keysRight) {
+ this.binaryComparatorFactories = binaryComparatorFactory;
+ this.keysLeft = keysLeft;
+ this.keysRight = keysRight;
+ }
+
+ @Override
+ public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+ IBinaryComparator[] binaryComparators = new IBinaryComparator[binaryComparatorFactories.length];
+ for (int i = 0; i < binaryComparators.length; i++) {
+ binaryComparators[i] = binaryComparatorFactories[i].createBinaryComparator();
+ }
+ return new JoinMultiComparator(binaryComparators, keysLeft, keysRight);
+ }
+}
+
+/**
+ * {@ ITuplePairComparator} implementation for optimized hybrid hash join.
+ * The comparator applies multiple binary comparators, one for each key pairs
+ */
+class JoinMultiComparator implements ITuplePairComparator {
+ private final IBinaryComparator[] binaryComparators;
+ private final int[] keysLeft;
+ private final int[] keysRight;
+
+ public JoinMultiComparator(IBinaryComparator[] bComparator, int[] keysLeft, int[] keysRight) {
+ this.binaryComparators = bComparator;
+ this.keysLeft = keysLeft;
+ this.keysRight = keysRight;
+ }
+
+ @Override
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+ throws HyracksDataException {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ for (int i = 0; i < binaryComparators.length; i++) {
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, keysLeft[i]);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, keysLeft[i]);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, keysRight[i]);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, keysRight[i]);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = binaryComparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0,
+ accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
new file mode 100644
index 0000000..55616f9
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+
+public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator {
+
+ private final int tableSize;
+
+ /**
+ * builds on the first operator and probes on the second.
+ */
+
+ public InMemoryHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
+ List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize) {
+ super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
+ this.tableSize = tableSize;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.IN_MEMORY_HASH_JOIN;
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
+ int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+ IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
+ keysLeftBranch, env, context);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
+ int i = 0;
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ for (LogicalVariable v : keysLeftBranch) {
+ Object t = env.getVarType(v);
+ comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
+ }
+
+ IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
+ IPredicateEvaluatorFactory predEvaluatorFactory = ( predEvaluatorFactoryProvider == null ? null : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
+
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ IOperatorDescriptor opDesc = null;
+
+ switch (kind) {
+ case INNER: {
+ opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
+ comparatorFactories, recDescriptor, tableSize, predEvaluatorFactory);
+ break;
+ }
+ case LEFT_OUTER: {
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
+ comparatorFactories, predEvaluatorFactory, recDescriptor, true, nullWriterFactories, tableSize);
+ break;
+ }
+ default: {
+ throw new NotImplementedException();
+ }
+ }
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+ ILogicalOperator src1 = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src1, 0, op, 0);
+ ILogicalOperator src2 = op.getInputs().get(1).getValue();
+ builder.contributeGraphEdge(src2, 0, op, 1);
+ }
+
+ @Override
+ protected List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op0 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ IPhysicalPropertiesVector pv0 = op0.getPhysicalOperator().getDeliveredProperties();
+ List<ILocalStructuralProperty> lp0 = pv0.getLocalProperties();
+ if (lp0 != null) {
+ // maintains the local properties on the probe side
+ return new LinkedList<ILocalStructuralProperty>(lp0);
+ }
+ return new LinkedList<ILocalStructuralProperty>();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java
new file mode 100644
index 0000000..4a7d875
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryStableSortPOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.sort.InMemorySortRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class InMemoryStableSortPOperator extends AbstractStableSortPOperator {
+
+ public InMemoryStableSortPOperator() {
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.IN_MEMORY_STABLE_SORT;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+ int n = sortColumns.length;
+ int[] sortFields = new int[n];
+ IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+ int i = 0;
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+ for (OrderColumn oc : sortColumns) {
+ LogicalVariable var = oc.getColumn();
+ sortFields[i] = opSchema.findVariable(var);
+ Object type = env.getVarType(var);
+ OrderKind order = oc.getOrder();
+ if (i == 0 && nkcfProvider != null && type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+ }
+
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+ i++;
+ }
+
+ IPushRuntimeFactory runtime = new InMemorySortRuntimeFactory(sortFields, nkcf, comps, null);
+ builder.contributeMicroOperator(op, runtime, recDescriptor);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
new file mode 100644
index 0000000..db00b9e
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class IndexBulkloadPOperator extends AbstractPhysicalOperator {
+
+ private final List<LogicalVariable> primaryKeys;
+ private final List<LogicalVariable> secondaryKeys;
+ private final List<LogicalVariable> additionalFilteringKeys;
+ private final Mutable<ILogicalExpression> filterExpr;
+ private final IDataSourceIndex<?, ?> dataSourceIndex;
+
+ public IndexBulkloadPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
+ IDataSourceIndex<?, ?> dataSourceIndex) {
+ this.primaryKeys = primaryKeys;
+ this.secondaryKeys = secondaryKeys;
+ this.additionalFilteringKeys = additionalFilteringKeys;
+ this.filterExpr = filterExpr;
+ this.dataSourceIndex = dataSourceIndex;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.INDEX_BULKLOAD;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ List<LogicalVariable> scanVariables = new ArrayList<>();
+ scanVariables.addAll(primaryKeys);
+ scanVariables.add(new LogicalVariable(-1));
+ IPhysicalPropertiesVector physicalProps = dataSourceIndex.getDataSource().getPropertiesProvider()
+ .computePropertiesVector(scanVariables);
+ List<ILocalStructuralProperty> localProperties = new ArrayList<>();
+ List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+ // Data needs to be sorted based on the [token, number of token, PK]
+ // OR [token, PK] if the index is not partitioned
+ for (LogicalVariable skVar : secondaryKeys) {
+ orderColumns.add(new OrderColumn(skVar, OrderKind.ASC));
+ }
+ for (LogicalVariable pkVar : primaryKeys) {
+ orderColumns.add(new OrderColumn(pkVar, OrderKind.ASC));
+ }
+ localProperties.add(new LocalOrderProperty(orderColumns));
+ StructuralPropertiesVector spv = new StructuralPropertiesVector(physicalProps.getPartitioningProperty(),
+ localProperties);
+ return new PhysicalRequirements(new IPhysicalPropertiesVector[] { spv },
+ IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ IndexInsertDeleteOperator indexInsertDeleteOp = (IndexInsertDeleteOperator) op;
+ assert indexInsertDeleteOp.getOperation() == Kind.INSERT;
+ assert indexInsertDeleteOp.isBulkload();
+
+ IMetadataProvider mp = context.getMetadataProvider();
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+ JobSpecification spec = builder.getJobSpec();
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getIndexInsertRuntime(
+ dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys, secondaryKeys,
+ additionalFilteringKeys, null, inputDesc, context, spec, true);
+ builder.contributeHyracksOperator(indexInsertDeleteOp, runtimeAndConstraints.first);
+ builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+ ILogicalOperator src = indexInsertDeleteOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, indexInsertDeleteOp, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
new file mode 100644
index 0000000..4bc4c06
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class IndexInsertDeletePOperator extends AbstractPhysicalOperator {
+
+ private final List<LogicalVariable> primaryKeys;
+ private final List<LogicalVariable> secondaryKeys;
+ private final ILogicalExpression filterExpr;
+ private final IDataSourceIndex<?, ?> dataSourceIndex;
+ private final List<LogicalVariable> additionalFilteringKeys;
+
+ public IndexInsertDeletePOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
+ IDataSourceIndex<?, ?> dataSourceIndex) {
+ this.primaryKeys = primaryKeys;
+ this.secondaryKeys = secondaryKeys;
+ if (filterExpr != null) {
+ this.filterExpr = filterExpr.getValue();
+ } else {
+ this.filterExpr = null;
+ }
+ this.dataSourceIndex = dataSourceIndex;
+ this.additionalFilteringKeys = additionalFilteringKeys;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.INDEX_INSERT_DELETE;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
+ scanVariables.addAll(primaryKeys);
+ scanVariables.add(new LogicalVariable(-1));
+ IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
+ .computePropertiesVector(scanVariables);
+ r.getLocalProperties().clear();
+ IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
+ requirements[0] = r;
+ return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ IndexInsertDeleteOperator insertDeleteOp = (IndexInsertDeleteOperator) op;
+ IMetadataProvider mp = context.getMetadataProvider();
+
+ JobSpecification spec = builder.getJobSpec();
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteOp);
+ if (insertDeleteOp.getOperation() == Kind.INSERT) {
+ runtimeAndConstraints = mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+ primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false);
+ } else {
+ runtimeAndConstraints = mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+ primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
+ }
+ builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
+ builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+ ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
new file mode 100644
index 0000000..993e8d7
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+@SuppressWarnings("rawtypes")
+public class InsertDeletePOperator extends AbstractPhysicalOperator {
+
+ private LogicalVariable payload;
+ private List<LogicalVariable> keys;
+ private IDataSource<?> dataSource;
+ private final List<LogicalVariable> additionalFilteringKeys;
+
+ public InsertDeletePOperator(LogicalVariable payload, List<LogicalVariable> keys,
+ List<LogicalVariable> additionalFilteringKeys, IDataSource dataSource) {
+ this.payload = payload;
+ this.keys = keys;
+ this.dataSource = dataSource;
+ this.additionalFilteringKeys = additionalFilteringKeys;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.INSERT_DELETE;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
+ scanVariables.addAll(keys);
+ scanVariables.add(new LogicalVariable(-1));
+ IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computePropertiesVector(scanVariables);
+ r.getLocalProperties().clear();
+ IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
+ requirements[0] = r;
+ return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
+ IMetadataProvider mp = context.getMetadataProvider();
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+ JobSpecification spec = builder.getJobSpec();
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
+ if (insertDeleteOp.getOperation() == Kind.INSERT) {
+ runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+ additionalFilteringKeys, inputDesc, context, spec, false);
+ } else {
+ runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+ additionalFilteringKeys, inputDesc, context, spec);
+ }
+
+ builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
+ builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+ ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
new file mode 100644
index 0000000..27f4b25
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
+
+public class MaterializePOperator extends AbstractPhysicalOperator {
+
+ private final boolean isSingleActivity;
+
+ public MaterializePOperator(boolean isSingleActivity) {
+ this.isSingleActivity = isSingleActivity;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.MATERIALIZE;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
+ MaterializingOperatorDescriptor materializationOpDesc = new MaterializingOperatorDescriptor(
+ builder.getJobSpec(), recDescriptor, isSingleActivity);
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, materializationOpDesc);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[] { 0 };
+ int[] outputDependencyLabels;
+ if (isSingleActivity) {
+ outputDependencyLabels = new int[] { 0 };
+ } else {
+ outputDependencyLabels = new int[] { 1 };
+ }
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
new file mode 100644
index 0000000..546a77b
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.NestedPlansAccumulatingAggregatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.group.MicroPreClusteredGroupRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MicroPreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
+
+ public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
+ super(columnList);
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.MICRO_PRE_CLUSTERED_GROUP_BY;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
+ GroupByOperator gby = (GroupByOperator) op;
+ int numFds = gby.getDecorList().size();
+ int fdColumns[] = new int[numFds];
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+ int j = 0;
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new AlgebricksException("pre-sorted group-by expects variable references.");
+ }
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ LogicalVariable decor = v.getVariableReference();
+ fdColumns[j++] = inputSchemas[0].findVariable(decor);
+ }
+ // compile subplans and set the gby op. schema accordingly
+ AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
+ IAggregatorDescriptorFactory aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys,
+ fdColumns);
+
+ IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+ columnList, env, context);
+ RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
+ RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op.getInputs().get(0).getValue()),
+ inputSchemas[0], context);
+ MicroPreClusteredGroupRuntimeFactory runtime = new MicroPreClusteredGroupRuntimeFactory(keys,
+ comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null);
+ builder.contributeMicroOperator(gby, runtime, recordDescriptor);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
new file mode 100644
index 0000000..a2e697d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -0,0 +1,288 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+
+/**
+ * Left input is broadcast and preserves its local properties.
+ * Right input can be partitioned in any way.
+ */
+public class NLJoinPOperator extends AbstractJoinPOperator {
+
+ private final int memSize;
+
+ public NLJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) {
+ super(kind, partitioningType);
+ this.memSize = memSize;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.NESTED_LOOP;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
+ if (partitioningType != JoinPartitioningType.BROADCAST) {
+ throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
+ }
+
+ IPartitioningProperty pp;
+
+ AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
+
+ if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(1).getValue();
+ IPhysicalPropertiesVector pv1 = op2.getPhysicalOperator().getDeliveredProperties();
+ if (pv1 == null) {
+ pp = null;
+ } else {
+ pp = pv1.getPartitioningProperty();
+ }
+ } else {
+ pp = IPartitioningProperty.UNPARTITIONED;
+ }
+
+ List<ILocalStructuralProperty> localProps = new LinkedList<ILocalStructuralProperty>();
+ this.deliveredProperties = new StructuralPropertiesVector(pp, localProps);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ if (partitioningType != JoinPartitioningType.BROADCAST) {
+ throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
+ }
+
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
+ pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+ pv[1] = new StructuralPropertiesVector(null, null);
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
+ IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
+ conditionInputSchemas[0] = propagatedSchema;
+ IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+ IScalarEvaluatorFactory cond = expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
+ context.getTypeEnvironment(op), conditionInputSchemas, context);
+ ITuplePairComparatorFactory comparatorFactory = new TuplePairEvaluatorFactory(cond,
+ context.getBinaryBooleanInspectorFactory());
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ IOperatorDescriptor opDesc = null;
+
+ switch (kind) {
+ case INNER: {
+ opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
+ null);
+ break;
+ }
+ case LEFT_OUTER: {
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
+ nullWriterFactories);
+ break;
+ }
+ default: {
+ throw new NotImplementedException();
+ }
+ }
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+ ILogicalOperator src1 = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src1, 0, op, 0);
+ ILogicalOperator src2 = op.getInputs().get(1).getValue();
+ builder.contributeGraphEdge(src2, 0, op, 1);
+ }
+
+ public static class TuplePairEvaluatorFactory implements ITuplePairComparatorFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final IScalarEvaluatorFactory cond;
+ private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+
+ public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond,
+ IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
+ this.cond = cond;
+ this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
+ }
+
+ @Override
+ public synchronized ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+ return new TuplePairEvaluator(ctx, cond, binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx));
+ }
+ }
+
+ public static class TuplePairEvaluator implements ITuplePairComparator {
+ private final IHyracksTaskContext ctx;
+ private IScalarEvaluator condEvaluator;
+ private final IScalarEvaluatorFactory condFactory;
+ private final IPointable p;
+ private final CompositeFrameTupleReference compositeTupleRef;
+ private final FrameTupleReference leftRef;
+ private final FrameTupleReference rightRef;
+ private final IBinaryBooleanInspector binaryBooleanInspector;
+
+ public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
+ IBinaryBooleanInspector binaryBooleanInspector) {
+ this.ctx = ctx;
+ this.condFactory = condFactory;
+ this.binaryBooleanInspector = binaryBooleanInspector;
+ this.leftRef = new FrameTupleReference();
+ this.p = VoidPointable.FACTORY.createPointable();
+ this.rightRef = new FrameTupleReference();
+ this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, rightRef);
+ }
+
+ @Override
+ public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
+ int innerIndex) throws HyracksDataException {
+ if (condEvaluator == null) {
+ try {
+ this.condEvaluator = condFactory.createScalarEvaluator(ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, innerIndex);
+ try {
+ condEvaluator.evaluate(compositeTupleRef, p);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ boolean result = binaryBooleanInspector
+ .getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+ if (result)
+ return 0;
+ else
+ return 1;
+ }
+ }
+
+ public static class CompositeFrameTupleReference implements IFrameTupleReference {
+
+ private final FrameTupleReference refLeft;
+ private final FrameTupleReference refRight;
+
+ public CompositeFrameTupleReference(FrameTupleReference refLeft, FrameTupleReference refRight) {
+ this.refLeft = refLeft;
+ this.refRight = refRight;
+ }
+
+ public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, IFrameTupleAccessor innerAccessor,
+ int innerIndex) {
+ refLeft.reset(outerAccessor, outerIndex);
+ refRight.reset(innerAccessor, innerIndex);
+ }
+
+ @Override
+ public int getFieldCount() {
+ return refLeft.getFieldCount() + refRight.getFieldCount();
+ }
+
+ @Override
+ public byte[] getFieldData(int fIdx) {
+ int leftFieldCount = refLeft.getFieldCount();
+ if (fIdx < leftFieldCount)
+ return refLeft.getFieldData(fIdx);
+ else
+ return refRight.getFieldData(fIdx - leftFieldCount);
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ int leftFieldCount = refLeft.getFieldCount();
+ if (fIdx < leftFieldCount)
+ return refLeft.getFieldStart(fIdx);
+ else
+ return refRight.getFieldStart(fIdx - leftFieldCount);
+ }
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ int leftFieldCount = refLeft.getFieldCount();
+ if (fIdx < leftFieldCount)
+ return refLeft.getFieldLength(fIdx);
+ else
+ return refRight.getFieldLength(fIdx - leftFieldCount);
+ }
+
+ @Override
+ public IFrameTupleAccessor getFrameTupleAccessor() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int getTupleIndex() {
+ throw new NotImplementedException();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
new file mode 100644
index 0000000..4fdfa32
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class NestedTupleSourcePOperator extends AbstractPhysicalOperator {
+
+ public NestedTupleSourcePOperator() {
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.NESTED_TUPLE_SOURCE;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ Mutable<ILogicalOperator> dataSource = ((NestedTupleSourceOperator) op).getDataSourceReference();
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) dataSource.getValue().getInputs().get(0).getValue();
+ IPhysicalPropertiesVector inheritedProps = op2.getDeliveredPhysicalProperties();
+ AbstractLogicalOperator parent = (AbstractLogicalOperator) dataSource.getValue();
+ if (parent.getOperatorTag() == LogicalOperatorTag.GROUP) {
+ // The following part computes the data property regarding to each particular group.
+ // TODO(buyingyi): we need to add the original data property as well. But currently
+ // there are places assuming there is only one LocalOrderProperty and one
+ // LocalGroupingProperty delivered by an operator.
+ GroupByOperator gby = (GroupByOperator) parent;
+ List<ILocalStructuralProperty> originalLocalProperties = inheritedProps.getLocalProperties();
+ List<ILocalStructuralProperty> newLocalProperties = null;
+ if (originalLocalProperties != null) {
+ newLocalProperties = new ArrayList<ILocalStructuralProperty>();
+ for (ILocalStructuralProperty lsp : inheritedProps.getLocalProperties()) {
+ ILocalStructuralProperty newLsp = lsp.regardToGroup(gby.getGbyVarList());
+ if (newLsp != null) {
+ newLocalProperties.add(newLsp);
+ }
+ }
+ }
+ deliveredProperties = new StructuralPropertiesVector(inheritedProps.getPartitioningProperty(),
+ newLocalProperties);
+ } else {
+ deliveredProperties = inheritedProps.clone();
+ }
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return null;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ propagatedSchema.addAllVariables(outerPlanSchema);
+ NestedTupleSourceRuntimeFactory runtime = new NestedTupleSourceRuntimeFactory();
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+ context);
+ builder.contributeMicroOperator(op, runtime, recDesc);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
new file mode 100644
index 0000000..456744e
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+
+public class OneToOneExchangePOperator extends AbstractExchangePOperator {
+
+ public OneToOneExchangePOperator() {
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) {
+ IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, TargetConstraint.SAME_COUNT);
+ }
+
+}