You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:41:34 UTC
[21/51] [partial] incubator-asterixdb-hyracks git commit: Change
folder structure for Java repackage
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..4cb111d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+ private static final long serialVersionUID = 1L;
+ private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+
+ @Override
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts) {
+ if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+ return senderSideMaterializePolicy;
+ } else {
+ return pipeliningPolicy;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
new file mode 100644
index 0000000..e1f897c
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobBuilder implements IHyracksJobBuilder {
+
+ private JobSpecification jobSpec;
+ private AlgebricksPartitionConstraint clusterLocations;
+
+ private Map<ILogicalOperator, ArrayList<ILogicalOperator>> outEdges = new HashMap<ILogicalOperator, ArrayList<ILogicalOperator>>();
+ private Map<ILogicalOperator, ArrayList<ILogicalOperator>> inEdges = new HashMap<ILogicalOperator, ArrayList<ILogicalOperator>>();
+ private Map<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>> connectors = new HashMap<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>>();
+
+ private Map<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>> microOps = new HashMap<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>>();
+ private Map<IPushRuntimeFactory, ILogicalOperator> revMicroOpMap = new HashMap<IPushRuntimeFactory, ILogicalOperator>();
+ private Map<ILogicalOperator, IOperatorDescriptor> hyracksOps = new HashMap<ILogicalOperator, IOperatorDescriptor>();
+ private Map<ILogicalOperator, AlgebricksPartitionConstraint> pcForMicroOps = new HashMap<ILogicalOperator, AlgebricksPartitionConstraint>();
+
+ private int aodCounter = 0;
+ private Map<ILogicalOperator, Integer> algebraicOpBelongingToMetaAsterixOp = new HashMap<ILogicalOperator, Integer>();
+ private Map<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>> metaAsterixOpSkeletons = new HashMap<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>>();
+ private Map<Integer, AlgebricksMetaOperatorDescriptor> metaAsterixOps = new HashMap<Integer, AlgebricksMetaOperatorDescriptor>();
+ private final Map<IOperatorDescriptor, AlgebricksPartitionConstraint> partitionConstraintMap = new HashMap<IOperatorDescriptor, AlgebricksPartitionConstraint>();
+
+ public JobBuilder(JobSpecification jobSpec, AlgebricksPartitionConstraint clusterLocations) {
+ this.jobSpec = jobSpec;
+ this.clusterLocations = clusterLocations;
+ }
+
+ @Override
+ public void contributeMicroOperator(ILogicalOperator op, IPushRuntimeFactory runtime, RecordDescriptor recDesc) {
+ contributeMicroOperator(op, runtime, recDesc, null);
+ }
+
+ @Override
+ public void contributeMicroOperator(ILogicalOperator op, IPushRuntimeFactory runtime, RecordDescriptor recDesc,
+ AlgebricksPartitionConstraint pc) {
+ microOps.put(op, new Pair<IPushRuntimeFactory, RecordDescriptor>(runtime, recDesc));
+ revMicroOpMap.put(runtime, op);
+ if (pc != null) {
+ pcForMicroOps.put(op, pc);
+ }
+ AbstractLogicalOperator logicalOp = (AbstractLogicalOperator) op;
+ if (logicalOp.getExecutionMode() == ExecutionMode.UNPARTITIONED && pc == null) {
+ AlgebricksPartitionConstraint apc = new AlgebricksCountPartitionConstraint(1);
+ pcForMicroOps.put(logicalOp, apc);
+ }
+ }
+
+ @Override
+ public void contributeConnector(ILogicalOperator exchgOp, IConnectorDescriptor conn) {
+ connectors.put(exchgOp, new Pair<IConnectorDescriptor, TargetConstraint>(conn, null));
+ }
+
+ @Override
+ public void contributeConnectorWithTargetConstraint(ILogicalOperator exchgOp, IConnectorDescriptor conn,
+ TargetConstraint numberOfTargetPartitions) {
+ connectors.put(exchgOp, new Pair<IConnectorDescriptor, TargetConstraint>(conn, numberOfTargetPartitions));
+ }
+
+ @Override
+ public void contributeGraphEdge(ILogicalOperator src, int srcOutputIndex, ILogicalOperator dest, int destInputIndex) {
+ ArrayList<ILogicalOperator> outputs = outEdges.get(src);
+ if (outputs == null) {
+ outputs = new ArrayList<ILogicalOperator>();
+ outEdges.put(src, outputs);
+ }
+ addAtPos(outputs, dest, srcOutputIndex);
+
+ ArrayList<ILogicalOperator> inp = inEdges.get(dest);
+ if (inp == null) {
+ inp = new ArrayList<ILogicalOperator>();
+ inEdges.put(dest, inp);
+ }
+ addAtPos(inp, src, destInputIndex);
+ }
+
+ @Override
+ public void contributeHyracksOperator(ILogicalOperator op, IOperatorDescriptor opDesc) {
+ hyracksOps.put(op, opDesc);
+ }
+
+ @Override
+ public void contributeAlgebricksPartitionConstraint(IOperatorDescriptor opDesc, AlgebricksPartitionConstraint apc) {
+ partitionConstraintMap.put(opDesc, apc);
+ }
+
+ @Override
+ public JobSpecification getJobSpec() {
+ return jobSpec;
+ }
+
+ @Override
+ public void buildSpec(List<ILogicalOperator> roots) throws AlgebricksException {
+ buildAsterixComponents();
+ Map<IConnectorDescriptor, TargetConstraint> tgtConstraints = setupConnectors();
+ for (ILogicalOperator r : roots) {
+ IOperatorDescriptor opDesc = findOpDescForAlgebraicOp(r);
+ jobSpec.addRoot(opDesc);
+ }
+ setAllPartitionConstraints(tgtConstraints);
+ }
+
+ private void setAllPartitionConstraints(Map<IConnectorDescriptor, TargetConstraint> tgtConstraints) {
+ List<OperatorDescriptorId> roots = jobSpec.getRoots();
+ setSpecifiedPartitionConstraints();
+ for (OperatorDescriptorId rootId : roots) {
+ setPartitionConstraintsDFS(rootId, tgtConstraints, null);
+ }
+ }
+
+ private void setSpecifiedPartitionConstraints() {
+ for (ILogicalOperator op : pcForMicroOps.keySet()) {
+ AlgebricksPartitionConstraint pc = pcForMicroOps.get(op);
+ Integer k = algebraicOpBelongingToMetaAsterixOp.get(op);
+ AlgebricksMetaOperatorDescriptor amod = metaAsterixOps.get(k);
+ partitionConstraintMap.put(amod, pc);
+ }
+ for (IOperatorDescriptor opDesc : partitionConstraintMap.keySet()) {
+ AlgebricksPartitionConstraint pc = partitionConstraintMap.get(opDesc);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, pc);
+ }
+ }
+
+ private void setPartitionConstraintsDFS(OperatorDescriptorId opId,
+ Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp) {
+ List<IConnectorDescriptor> opInputs = jobSpec.getOperatorInputMap().get(opId);
+ AlgebricksPartitionConstraint opConstraint = null;
+ IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(opId);
+ if (opInputs != null) {
+ for (IConnectorDescriptor conn : opInputs) {
+ ConnectorDescriptorId cid = conn.getConnectorId();
+ org.apache.commons.lang3.tuple.Pair<org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>, org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>> p = jobSpec
+ .getConnectorOperatorMap().get(cid);
+ IOperatorDescriptor src = p.getLeft().getLeft();
+ // DFS
+ setPartitionConstraintsDFS(src.getOperatorId(), tgtConstraints, opDesc);
+
+ TargetConstraint constraint = tgtConstraints.get(conn);
+ if (constraint != null) {
+ switch (constraint) {
+ case ONE: {
+ opConstraint = new AlgebricksCountPartitionConstraint(1);
+ break;
+ }
+ case SAME_COUNT: {
+ opConstraint = partitionConstraintMap.get(src);
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (partitionConstraintMap.get(opDesc) == null) {
+ if (opConstraint == null) {
+ if (parentOp != null) {
+ AlgebricksPartitionConstraint pc = partitionConstraintMap.get(parentOp);
+ if (pc != null) {
+ opConstraint = pc;
+ } else if (opInputs == null || opInputs.size() == 0) {
+ opConstraint = new AlgebricksCountPartitionConstraint(1);
+ }
+ }
+ if (opConstraint == null) {
+ opConstraint = clusterLocations;
+ }
+ }
+ partitionConstraintMap.put(opDesc, opConstraint);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, opConstraint);
+ }
+ }
+
+ private Map<IConnectorDescriptor, TargetConstraint> setupConnectors() throws AlgebricksException {
+ Map<IConnectorDescriptor, TargetConstraint> tgtConstraints = new HashMap<IConnectorDescriptor, TargetConstraint>();
+ for (ILogicalOperator exchg : connectors.keySet()) {
+ ILogicalOperator inOp = inEdges.get(exchg).get(0);
+ ILogicalOperator outOp = outEdges.get(exchg).get(0);
+ IOperatorDescriptor inOpDesc = findOpDescForAlgebraicOp(inOp);
+ IOperatorDescriptor outOpDesc = findOpDescForAlgebraicOp(outOp);
+ Pair<IConnectorDescriptor, TargetConstraint> connPair = connectors.get(exchg);
+ IConnectorDescriptor conn = connPair.first;
+ int producerPort = outEdges.get(inOp).indexOf(exchg);
+ int consumerPort = inEdges.get(outOp).indexOf(exchg);
+ jobSpec.connect(conn, inOpDesc, producerPort, outOpDesc, consumerPort);
+ if (connPair.second != null) {
+ tgtConstraints.put(conn, connPair.second);
+ }
+ }
+ return tgtConstraints;
+ }
+
+ private IOperatorDescriptor findOpDescForAlgebraicOp(ILogicalOperator op) throws AlgebricksException {
+ IOperatorDescriptor hOpDesc = hyracksOps.get(op);
+ if (hOpDesc != null) {
+ return hOpDesc;
+ }
+ Integer metaOpKey = algebraicOpBelongingToMetaAsterixOp.get(op);
+ if (metaOpKey == null) {
+ throw new AlgebricksException("Could not generate operator descriptor for operator " + op);
+ }
+ return metaAsterixOps.get(metaOpKey);
+ }
+
+ private void buildAsterixComponents() {
+ for (ILogicalOperator aop : microOps.keySet()) {
+ addMicroOpToMetaRuntimeOp(aop);
+ }
+ for (Integer k : metaAsterixOpSkeletons.keySet()) {
+ List<Pair<IPushRuntimeFactory, RecordDescriptor>> opContents = metaAsterixOpSkeletons.get(k);
+ AlgebricksMetaOperatorDescriptor amod = buildMetaAsterixOpDesc(opContents);
+ metaAsterixOps.put(k, amod);
+ }
+ }
+
+ private AlgebricksMetaOperatorDescriptor buildMetaAsterixOpDesc(
+ List<Pair<IPushRuntimeFactory, RecordDescriptor>> opContents) {
+ // RecordDescriptor outputRecordDesc = null;
+ int n = opContents.size();
+ IPushRuntimeFactory[] runtimeFactories = new IPushRuntimeFactory[n];
+ RecordDescriptor[] internalRecordDescriptors = new RecordDescriptor[n];
+ int i = 0;
+ for (Pair<IPushRuntimeFactory, RecordDescriptor> p : opContents) {
+ runtimeFactories[i] = p.first;
+ internalRecordDescriptors[i] = p.second;
+ // if (i == n - 1) {
+ // outputRecordDesc = p.second;
+ // }
+ i++;
+ }
+ ILogicalOperator lastLogicalOp = revMicroOpMap.get(runtimeFactories[n - 1]);
+ ArrayList<ILogicalOperator> outOps = outEdges.get(lastLogicalOp);
+ int outArity = (outOps == null) ? 0 : outOps.size();
+ ILogicalOperator firstLogicalOp = revMicroOpMap.get(runtimeFactories[0]);
+ ArrayList<ILogicalOperator> inOps = inEdges.get(firstLogicalOp);
+ int inArity = (inOps == null) ? 0 : inOps.size();
+ // boolean isLeafOp = inEdges.get(firstLogicalOp) == null;
+ return new AlgebricksMetaOperatorDescriptor(jobSpec, inArity, outArity, runtimeFactories,
+ internalRecordDescriptors);
+ }
+
+ private void addMicroOpToMetaRuntimeOp(ILogicalOperator aop) {
+ Integer k = algebraicOpBelongingToMetaAsterixOp.get(aop);
+ if (k == null) {
+ k = createNewMetaOpInfo(aop);
+ }
+ ArrayList<ILogicalOperator> destList = outEdges.get(aop);
+ if (destList == null || destList.size() != 1) {
+ // for now, we only support linear plans inside meta-ops.
+ return;
+ }
+ ILogicalOperator dest = destList.get(0);
+ Integer j = algebraicOpBelongingToMetaAsterixOp.get(dest);
+ if (j == null && microOps.get(dest) != null) {
+ algebraicOpBelongingToMetaAsterixOp.put(dest, k);
+ List<Pair<IPushRuntimeFactory, RecordDescriptor>> aodContent1 = metaAsterixOpSkeletons.get(k);
+ aodContent1.add(microOps.get(dest));
+ } else if (j != null && j.intValue() != k.intValue()) {
+ // merge the j component into the k component
+ List<Pair<IPushRuntimeFactory, RecordDescriptor>> aodContent1 = metaAsterixOpSkeletons.get(k);
+ List<Pair<IPushRuntimeFactory, RecordDescriptor>> aodContent2 = metaAsterixOpSkeletons.get(j);
+ aodContent1.addAll(aodContent2);
+ metaAsterixOpSkeletons.remove(j);
+ for (ILogicalOperator m : algebraicOpBelongingToMetaAsterixOp.keySet()) {
+ Integer g = algebraicOpBelongingToMetaAsterixOp.get(m);
+ if (g.intValue() == j.intValue()) {
+ algebraicOpBelongingToMetaAsterixOp.put(m, k);
+ }
+ }
+ }
+
+ }
+
+ private int createNewMetaOpInfo(ILogicalOperator aop) {
+ int n = aodCounter;
+ aodCounter++;
+ List<Pair<IPushRuntimeFactory, RecordDescriptor>> metaOpContents = new ArrayList<Pair<IPushRuntimeFactory, RecordDescriptor>>();
+ metaOpContents.add(microOps.get(aop));
+ metaAsterixOpSkeletons.put(n, metaOpContents);
+ algebraicOpBelongingToMetaAsterixOp.put(aop, n);
+ return n;
+ }
+
+ private <E> void addAtPos(ArrayList<E> a, E elem, int pos) {
+ int n = a.size();
+ if (n > pos) {
+ a.set(pos, elem);
+ } else {
+ for (int k = n; k < pos; k++) {
+ a.add(null);
+ }
+ a.add(elem);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
new file mode 100644
index 0000000..a181304
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.INullableTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IPartialAggregationTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
+
+public class JobGenContext {
+ private final IOperatorSchema outerFlowSchema;
+ private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
+ private final ISerializerDeserializerProvider serializerDeserializerProvider;
+ private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
+ private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
+ private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
+ private final IPrinterFactoryProvider printerFactoryProvider;
+ private final ITypeTraitProvider typeTraitProvider;
+ private final IMetadataProvider<?, ?> metadataProvider;
+ private final INullWriterFactory nullWriterFactory;
+ private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
+ private final Object appContext;
+ private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
+ private final IBinaryIntegerInspectorFactory integerInspectorFactory;
+ private final IExpressionRuntimeProvider expressionRuntimeProvider;
+ private final IExpressionTypeComputer expressionTypeComputer;
+ private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
+ private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
+ private final IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
+ private final int frameSize;
+ private AlgebricksPartitionConstraint clusterLocations;
+ private int varCounter;
+ private final ITypingContext typingContext;
+
+ public JobGenContext(IOperatorSchema outerFlowSchema, IMetadataProvider<?, ?> metadataProvider, Object appContext,
+ ISerializerDeserializerProvider serializerDeserializerProvider,
+ IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
+ IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider,
+ IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
+ IBinaryBooleanInspectorFactory booleanInspectorFactory,
+ IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
+ INullWriterFactory nullWriterFactory,
+ INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
+ IExpressionRuntimeProvider expressionRuntimeProvider, IExpressionTypeComputer expressionTypeComputer,
+ INullableTypeComputer nullableTypeComputer, ITypingContext typingContext,
+ IExpressionEvalSizeComputer expressionEvalSizeComputer,
+ IPartialAggregationTypeComputer partialAggregationTypeComputer,
+ IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider, int frameSize,
+ AlgebricksPartitionConstraint clusterLocations) {
+ this.outerFlowSchema = outerFlowSchema;
+ this.metadataProvider = metadataProvider;
+ this.appContext = appContext;
+ this.serializerDeserializerProvider = serializerDeserializerProvider;
+ this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
+ this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
+ this.comparatorFactoryProvider = comparatorFactoryProvider;
+ this.typeTraitProvider = typeTraitProvider;
+ this.booleanInspectorFactory = booleanInspectorFactory;
+ this.integerInspectorFactory = integerInspectorFactory;
+ this.printerFactoryProvider = printerFactoryProvider;
+ this.clusterLocations = clusterLocations;
+ this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
+ this.nullWriterFactory = nullWriterFactory;
+ this.expressionRuntimeProvider = expressionRuntimeProvider;
+ this.expressionTypeComputer = expressionTypeComputer;
+ this.typingContext = typingContext;
+ this.expressionEvalSizeComputer = expressionEvalSizeComputer;
+ this.partialAggregationTypeComputer = partialAggregationTypeComputer;
+ this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
+ this.frameSize = frameSize;
+ this.varCounter = 0;
+ }
+
+ public IOperatorSchema getOuterFlowSchema() {
+ return outerFlowSchema;
+ }
+
+ public AlgebricksPartitionConstraint getClusterLocations() {
+ return clusterLocations;
+ }
+
+ public IMetadataProvider<?, ?> getMetadataProvider() {
+ return metadataProvider;
+ }
+
+ public Object getAppContext() {
+ return appContext;
+ }
+
+ public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
+ return serializerDeserializerProvider;
+ }
+
+ public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
+ return hashFunctionFactoryProvider;
+ }
+
+ public IBinaryHashFunctionFamilyProvider getBinaryHashFunctionFamilyProvider() {
+ return hashFunctionFamilyProvider;
+ }
+
+ public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
+ return comparatorFactoryProvider;
+ }
+
+ public ITypeTraitProvider getTypeTraitProvider() {
+ return typeTraitProvider;
+ }
+
+ public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+ return booleanInspectorFactory;
+ }
+
+ public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
+ return integerInspectorFactory;
+ }
+
+ public IPrinterFactoryProvider getPrinterFactoryProvider() {
+ return printerFactoryProvider;
+ }
+
+ public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider() {
+ return predEvaluatorFactoryProvider;
+ }
+
+ public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
+ return expressionRuntimeProvider;
+ }
+
+ public IOperatorSchema getSchema(ILogicalOperator op) {
+ return schemaMap.get(op);
+ }
+
+ public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
+ schemaMap.put(op, schema);
+ }
+
+ public LogicalVariable createNewVar() {
+ varCounter++;
+ LogicalVariable var = new LogicalVariable(-varCounter);
+ return var;
+ }
+
+ public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env) throws AlgebricksException {
+ return expressionTypeComputer.getType(expr, typingContext.getMetadataProvider(), env);
+ }
+
+ public INullWriterFactory getNullWriterFactory() {
+ return nullWriterFactory;
+ }
+
+ public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
+ return normalizedKeyComputerFactoryProvider;
+ }
+
+ public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
+ return expressionEvalSizeComputer;
+ }
+
+ public int getFrameSize() {
+ return frameSize;
+ }
+
+ public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
+ return partialAggregationTypeComputer;
+ }
+
+ public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
+ return typingContext.getOutputTypeEnvironment(op);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
new file mode 100644
index 0000000..2d195e4
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public final class JobGenHelper {
+
+ private static final Logger LOGGER = Logger.getLogger(JobGenHelper.class.getName());
+
+ @SuppressWarnings("rawtypes")
+ public static RecordDescriptor mkRecordDescriptor(IVariableTypeEnvironment env, IOperatorSchema opSchema,
+ JobGenContext context) throws AlgebricksException {
+ ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
+ ITypeTraits[] typeTraits = new ITypeTraits[opSchema.getSize()];
+ ISerializerDeserializerProvider sdp = context.getSerializerDeserializerProvider();
+ ITypeTraitProvider ttp = context.getTypeTraitProvider();
+ int i = 0;
+ for (LogicalVariable var : opSchema) {
+ Object t = env.getVarType(var);
+ if (t == null) {
+ LOGGER.warning("No type for variable " + var);
+ }
+ fields[i] = sdp.getSerializerDeserializer(t);
+ typeTraits[i] = ttp.getTypeTrait(t);
+ i++;
+ }
+ return new RecordDescriptor(fields, typeTraits);
+ }
+
+ public static IPrinterFactory[] mkPrinterFactories(IOperatorSchema opSchema, IVariableTypeEnvironment env,
+ JobGenContext context, int[] printColumns) throws AlgebricksException {
+ IPrinterFactory[] pf = new IPrinterFactory[printColumns.length];
+ IPrinterFactoryProvider pff = context.getPrinterFactoryProvider();
+ for (int i = 0; i < pf.length; i++) {
+ LogicalVariable v = opSchema.getVariable(printColumns[i]);
+ Object t = env.getVarType(v);
+ pf[i] = pff.getPrinterFactory(t);
+ }
+ return pf;
+ }
+
+ public static int[] variablesToFieldIndexes(Collection<LogicalVariable> varLogical, IOperatorSchema opSchema) {
+ int[] tuplePos = new int[varLogical.size()];
+ int i = 0;
+ for (LogicalVariable var : varLogical) {
+ tuplePos[i] = opSchema.findVariable(var);
+ i++;
+ }
+ return tuplePos;
+ }
+
+ public static IBinaryHashFunctionFactory[] variablesToBinaryHashFunctionFactories(
+ Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+ throws AlgebricksException {
+ IBinaryHashFunctionFactory[] funFactories = new IBinaryHashFunctionFactory[varLogical.size()];
+ int i = 0;
+ IBinaryHashFunctionFactoryProvider bhffProvider = context.getBinaryHashFunctionFactoryProvider();
+ for (LogicalVariable var : varLogical) {
+ Object type = env.getVarType(var);
+ funFactories[i++] = bhffProvider.getBinaryHashFunctionFactory(type);
+ }
+ return funFactories;
+ }
+
+ public static IBinaryHashFunctionFamily[] variablesToBinaryHashFunctionFamilies(
+ Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+ throws AlgebricksException {
+ IBinaryHashFunctionFamily[] funFamilies = new IBinaryHashFunctionFamily[varLogical.size()];
+ int i = 0;
+ IBinaryHashFunctionFamilyProvider bhffProvider = context.getBinaryHashFunctionFamilyProvider();
+ for (LogicalVariable var : varLogical) {
+ Object type = env.getVarType(var);
+ funFamilies[i++] = bhffProvider.getBinaryHashFunctionFamily(type);
+ }
+ return funFamilies;
+ }
+
+ public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(
+ Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+ throws AlgebricksException {
+ IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[varLogical.size()];
+ IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
+ int i = 0;
+ for (LogicalVariable v : varLogical) {
+ Object type = env.getVarType(v);
+ compFactories[i++] = bcfProvider.getBinaryComparatorFactory(type, true);
+ }
+ return compFactories;
+ }
+
+ public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(List<LogicalVariable> varLogical,
+ int start, int size, IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
+ IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[size];
+ IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
+ for (int i = 0; i < size; i++) {
+ Object type = env.getVarType(varLogical.get(start + i));
+ compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
+ }
+ return compFactories;
+ }
+
+ public static INormalizedKeyComputerFactory variablesToAscNormalizedKeyComputerFactory(
+ Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+ throws AlgebricksException {
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ if (nkcfProvider == null)
+ return null;
+ for (LogicalVariable v : varLogical) {
+ Object type = env.getVarType(v);
+ return nkcfProvider.getNormalizedKeyComputerFactory(type, true);
+ }
+ return null;
+ }
+
+ public static ITypeTraits[] variablesToTypeTraits(Collection<LogicalVariable> varLogical,
+ IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
+ ITypeTraits[] typeTraits = new ITypeTraits[varLogical.size()];
+ ITypeTraitProvider typeTraitProvider = context.getTypeTraitProvider();
+ int i = 0;
+ for (LogicalVariable v : varLogical) {
+ Object type = env.getVarType(v);
+ typeTraits[i++] = typeTraitProvider.getTypeTrait(type);
+ }
+ return typeTraits;
+ }
+
+ public static ITypeTraits[] variablesToTypeTraits(List<LogicalVariable> varLogical, int start, int size,
+ IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
+ ITypeTraits[] typeTraits = new ITypeTraits[size];
+ ITypeTraitProvider typeTraitProvider = context.getTypeTraitProvider();
+ for (int i = 0; i < size; i++) {
+ Object type = env.getVarType(varLogical.get(start + i));
+ typeTraits[i] = typeTraitProvider.getTypeTrait(type);
+ }
+ return typeTraits;
+ }
+
+ public static int[] projectAllVariables(IOperatorSchema opSchema) {
+ int[] projectionList = new int[opSchema.getSize()];
+ int k = 0;
+ for (LogicalVariable v : opSchema) {
+ projectionList[k++] = opSchema.findVariable(v);
+ }
+ return projectionList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java
new file mode 100644
index 0000000..dc1fb35
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+
+public class OperatorSchemaImpl implements IOperatorSchema {
+ private final Map<LogicalVariable, Integer> varMap;
+
+ private final List<LogicalVariable> varList;
+
+ public OperatorSchemaImpl() {
+ varMap = new HashMap<LogicalVariable, Integer>();
+ varList = new ArrayList<LogicalVariable>();
+ }
+
+ @Override
+ public void addAllVariables(IOperatorSchema source) {
+ for (LogicalVariable v : source) {
+ varMap.put(v, varList.size());
+ varList.add(v);
+ }
+ }
+
+ @Override
+ public void addAllNewVariables(IOperatorSchema source) {
+ for (LogicalVariable v : source) {
+ if (varMap.get(v) == null) {
+ varMap.put(v, varList.size());
+ varList.add(v);
+ }
+ }
+ }
+
+ @Override
+ public int addVariable(LogicalVariable var) {
+ int idx = varList.size();
+ varMap.put(var, idx);
+ varList.add(var);
+ return idx;
+ }
+
+ @Override
+ public void clear() {
+ varMap.clear();
+ varList.clear();
+ }
+
+ @Override
+ public int findVariable(LogicalVariable var) {
+ Integer i = varMap.get(var);
+ if (i == null) {
+ return -1;
+ }
+ return i;
+ }
+
+ @Override
+ public int getSize() {
+ return varList.size();
+ }
+
+ @Override
+ public LogicalVariable getVariable(int index) {
+ return varList.get(index);
+ }
+
+ @Override
+ public Iterator<LogicalVariable> iterator() {
+ return varList.iterator();
+ }
+
+ @Override
+ public String toString() {
+ return varMap.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
new file mode 100644
index 0000000..3af57ad
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class PlanCompiler {
+ private JobGenContext context;
+ private Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> operatorVisitedToParents = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
+
+ public PlanCompiler(JobGenContext context) {
+ this.context = context;
+ }
+
+ public JobGenContext getContext() {
+ return context;
+ }
+
+ public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema,
+ IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
+ JobSpecification spec = new JobSpecification(context.getFrameSize());
+ if (jobEventListenerFactory != null) {
+ spec.setJobletEventListenerFactory(jobEventListenerFactory);
+ }
+ List<ILogicalOperator> rootOps = new ArrayList<ILogicalOperator>();
+ IHyracksJobBuilder builder = new JobBuilder(spec, context.getClusterLocations());
+ for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
+ compileOpRef(opRef, spec, builder, outerPlanSchema);
+ rootOps.add(opRef.getValue());
+ }
+ reviseEdges(builder);
+ operatorVisitedToParents.clear();
+ builder.buildSpec(rootOps);
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ // Do not do activity cluster planning because it is slow on large clusters
+ spec.setUseConnectorPolicyForScheduling(false);
+ return spec;
+ }
+
+ private void compileOpRef(Mutable<ILogicalOperator> opRef, IOperatorDescriptorRegistry spec,
+ IHyracksJobBuilder builder, IOperatorSchema outerPlanSchema) throws AlgebricksException {
+ ILogicalOperator op = opRef.getValue();
+ int n = op.getInputs().size();
+ IOperatorSchema[] schemas = new IOperatorSchema[n];
+ int i = 0;
+ for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) {
+ List<Mutable<ILogicalOperator>> parents = operatorVisitedToParents.get(opRef2);
+ if (parents == null) {
+ parents = new ArrayList<Mutable<ILogicalOperator>>();
+ operatorVisitedToParents.put(opRef2, parents);
+ parents.add(opRef);
+ compileOpRef(opRef2, spec, builder, outerPlanSchema);
+ schemas[i++] = context.getSchema(opRef2.getValue());
+ } else {
+ if (!parents.contains(opRef))
+ parents.add(opRef);
+ schemas[i++] = context.getSchema(opRef2.getValue());
+ continue;
+ }
+ }
+
+ IOperatorSchema opSchema = new OperatorSchemaImpl();
+ context.putSchema(op, opSchema);
+ op.getVariablePropagationPolicy().propagateVariables(opSchema, schemas);
+ op.contributeRuntimeOperator(builder, context, opSchema, schemas, outerPlanSchema);
+ }
+
+ private void reviseEdges(IHyracksJobBuilder builder) {
+ /**
+ * revise the edges for the case of replicate operator
+ */
+ for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : operatorVisitedToParents
+ .entrySet()) {
+ Mutable<ILogicalOperator> child = entry.getKey();
+ List<Mutable<ILogicalOperator>> parents = entry.getValue();
+ if (parents.size() > 1) {
+ if (child.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+ ReplicateOperator rop = (ReplicateOperator) child.getValue();
+ if (rop.isBlocker()) {
+ // make the order of the graph edges consistent with the order of rop's outputs
+ List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
+ for (Mutable<ILogicalOperator> parent : parents) {
+ builder.contributeGraphEdge(child.getValue(), outputs.indexOf(parent), parent.getValue(), 0);
+ }
+ } else {
+ int i = 0;
+ for (Mutable<ILogicalOperator> parent : parents) {
+ builder.contributeGraphEdge(child.getValue(), i, parent.getValue(), 0);
+ i++;
+ }
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
new file mode 100644
index 0000000..e7b469a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import java.util.Collection;
+import java.util.logging.Level;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+
+public abstract class AbstractRuleController {
+
+ protected IOptimizationContext context;
+
+ public AbstractRuleController() {
+ }
+
+ public void setContext(IOptimizationContext context) {
+ this.context = context;
+ }
+
+ /**
+ * Each rewriting strategy may differ in the
+ *
+ * @param root
+ * @param ruleClasses
+ * @return true iff one of the rules in the collection fired
+ */
+ public abstract boolean rewriteWithRuleCollection(Mutable<ILogicalOperator> root,
+ Collection<IAlgebraicRewriteRule> rules) throws AlgebricksException;
+
+ /**
+ * @param opRef
+ * @param rule
+ * @return true if any rewrite was fired, either on opRef or any operator
+ * under it.
+ */
+ protected boolean rewriteOperatorRef(Mutable<ILogicalOperator> opRef, IAlgebraicRewriteRule rule)
+ throws AlgebricksException {
+ return rewriteOperatorRef(opRef, rule, true, false);
+ }
+
+ private String getPlanString(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+ if (AlgebricksConfig.ALGEBRICKS_LOGGER.isLoggable(Level.FINE)) {
+ StringBuilder sb = new StringBuilder();
+ LogicalOperatorPrettyPrintVisitor pvisitor = context.getPrettyPrintVisitor();
+ PlanPrettyPrinter.printOperator((AbstractLogicalOperator) opRef.getValue(), sb, pvisitor, 0);
+ return sb.toString();
+ }
+ return null;
+ }
+
+ private void printRuleApplication(IAlgebraicRewriteRule rule, String beforePlan, String afterPlan)
+ throws AlgebricksException {
+ if (AlgebricksConfig.ALGEBRICKS_LOGGER.isLoggable(Level.FINE)) {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Rule " + rule.getClass() + " fired.\n");
+ AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Before plan\n" + beforePlan + "\n");
+ AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> After plan\n" + afterPlan + "\n");
+ }
+ }
+
+ protected boolean rewriteOperatorRef(Mutable<ILogicalOperator> opRef, IAlgebraicRewriteRule rule,
+ boolean enterNestedPlans, boolean fullDFS) throws AlgebricksException {
+
+ String preBeforePlan = getPlanString(opRef);
+ if (rule.rewritePre(opRef, context)) {
+ String preAfterPlan = getPlanString(opRef);
+ printRuleApplication(rule, preBeforePlan, preAfterPlan);
+ return true;
+ }
+ boolean rewritten = false;
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+
+ for (Mutable<ILogicalOperator> inp : op.getInputs()) {
+ if (rewriteOperatorRef(inp, rule, enterNestedPlans, fullDFS)) {
+ rewritten = true;
+ if (!fullDFS) {
+ break;
+ }
+ }
+ }
+
+ if (op.hasNestedPlans() && enterNestedPlans) {
+ AbstractOperatorWithNestedPlans o2 = (AbstractOperatorWithNestedPlans) op;
+ for (ILogicalPlan p : o2.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ if (rewriteOperatorRef(r, rule, enterNestedPlans, fullDFS)) {
+ rewritten = true;
+ if (!fullDFS) {
+ break;
+ }
+ }
+ }
+ if (rewritten && !fullDFS) {
+ break;
+ }
+ }
+ }
+
+ String postBeforePlan = getPlanString(opRef);
+ if (rule.rewritePost(opRef, context)) {
+ String postAfterPlan = getPlanString(opRef);
+ printRuleApplication(rule, postBeforePlan, postAfterPlan);
+ return true;
+ }
+
+ return rewritten;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
new file mode 100644
index 0000000..361dad3
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -0,0 +1,299 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.EquivalenceClass;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.INullableTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableEvalSizeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
+
+public class AlgebricksOptimizationContext implements IOptimizationContext {
+
+ private int varCounter;
+ private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
+ private final IMergeAggregationExpressionFactory mergeAggregationExpressionFactory;
+ private final PhysicalOptimizationConfig physicalOptimizationConfig;
+ private final IVariableEvalSizeEnvironment varEvalSizeEnv = new IVariableEvalSizeEnvironment() {
+
+ Map<LogicalVariable, Integer> varSizeMap = new HashMap<LogicalVariable, Integer>();
+
+ @Override
+ public void setVariableEvalSize(LogicalVariable var, int size) {
+ varSizeMap.put(var, size);
+ }
+
+ @Override
+ public int getVariableEvalSize(LogicalVariable var) {
+ return varSizeMap.get(var);
+ }
+ };
+
+ private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<ILogicalOperator, IVariableTypeEnvironment>();
+
+ private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<ILogicalOperator, HashSet<ILogicalOperator>>();
+ private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<IAlgebraicRewriteRule, HashSet<ILogicalOperator>>();
+ private Map<LogicalVariable, FunctionalDependency> recordToPrimaryKey = new HashMap<LogicalVariable, FunctionalDependency>();
+
+ @SuppressWarnings("unchecked")
+ private IMetadataProvider metadataProvider;
+ private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<LogicalVariable>();
+
+ protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<ILogicalOperator, List<FunctionalDependency>>();
+ protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>>();
+
+ protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>();
+ private final IExpressionTypeComputer expressionTypeComputer;
+ private final INullableTypeComputer nullableTypeComputer;
+ private final LogicalOperatorPrettyPrintVisitor prettyPrintVisitor;
+
+ public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
+ IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
+ IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
+ PhysicalOptimizationConfig physicalOptimizationConfig) {
+ this(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
+ nullableTypeComputer, physicalOptimizationConfig, new LogicalOperatorPrettyPrintVisitor());
+ }
+
+ public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
+ IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
+ IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
+ PhysicalOptimizationConfig physicalOptimizationConfig, LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
+ this.varCounter = varCounter;
+ this.expressionEvalSizeComputer = expressionEvalSizeComputer;
+ this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
+ this.expressionTypeComputer = expressionTypeComputer;
+ this.nullableTypeComputer = nullableTypeComputer;
+ this.physicalOptimizationConfig = physicalOptimizationConfig;
+ this.prettyPrintVisitor = prettyPrintVisitor;
+ }
+
+ public int getVarCounter() {
+ return varCounter;
+ }
+
+ public void setVarCounter(int varCounter) {
+ this.varCounter = varCounter;
+ }
+
+ public LogicalVariable newVar() {
+ varCounter++;
+ LogicalVariable var = new LogicalVariable(varCounter);
+ return var;
+ }
+
+ @SuppressWarnings("unchecked")
+ public IMetadataProvider getMetadataProvider() {
+ return metadataProvider;
+ }
+
+ public void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider) {
+ this.metadataProvider = metadataProvider;
+ }
+
+ public boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
+ HashSet<ILogicalOperator> operators = dontApply.get(rule);
+ if (operators == null) {
+ return false;
+ } else {
+ return operators.contains(op);
+ }
+ }
+
+ public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
+ HashSet<ILogicalOperator> operators = dontApply.get(rule);
+ if (operators == null) {
+ HashSet<ILogicalOperator> os = new HashSet<ILogicalOperator>();
+ os.add(op);
+ dontApply.put(rule, os);
+ } else {
+ operators.add(op);
+ }
+
+ }
+
+ /*
+ * returns true if op1 and op2 have already been compared
+ */
+ @Override
+ public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2) {
+ HashSet<ILogicalOperator> ops = alreadyCompared.get(op1);
+ if (ops == null) {
+ HashSet<ILogicalOperator> newEntry = new HashSet<ILogicalOperator>();
+ newEntry.add(op2);
+ alreadyCompared.put(op1, newEntry);
+ return false;
+ } else {
+ if (ops.contains(op2)) {
+ return true;
+ } else {
+ ops.add(op2);
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public void removeFromAlreadyCompared(ILogicalOperator op1) {
+ alreadyCompared.remove(op1);
+ }
+
+ public void addNotToBeInlinedVar(LogicalVariable var) {
+ notToBeInlinedVars.add(var);
+ }
+
+ public boolean shouldNotBeInlined(LogicalVariable var) {
+ return notToBeInlinedVars.contains(var);
+ }
+
+ public void addPrimaryKey(FunctionalDependency pk) {
+ assert (pk.getTail().size() == 1);
+ LogicalVariable recordVar = pk.getTail().get(0);
+ recordToPrimaryKey.put(recordVar, pk);
+ }
+
+ public List<LogicalVariable> findPrimaryKey(LogicalVariable recordVar) {
+ FunctionalDependency fd = recordToPrimaryKey.get(recordVar);
+ if (fd == null) {
+ return null;
+ }
+ return fd.getHead();
+ }
+
+ @Override
+ public Map<LogicalVariable, EquivalenceClass> getEquivalenceClassMap(ILogicalOperator op) {
+ return eqClassGlobalMap.get(op);
+ }
+
+ @Override
+ public List<FunctionalDependency> getFDList(ILogicalOperator op) {
+ return fdGlobalMap.get(op);
+ }
+
+ @Override
+ public void putEquivalenceClassMap(ILogicalOperator op, Map<LogicalVariable, EquivalenceClass> eqClassMap) {
+ this.eqClassGlobalMap.put(op, eqClassMap);
+ }
+
+ @Override
+ public void putFDList(ILogicalOperator op, List<FunctionalDependency> fdList) {
+ this.fdGlobalMap.put(op, fdList);
+ }
+
+ @Override
+ public ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op) {
+ return logicalProps.get(op);
+ }
+
+ @Override
+ public void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v) {
+ logicalProps.put(op, v);
+ }
+
+ @Override
+ public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
+ return expressionEvalSizeComputer;
+ }
+
+ @Override
+ public IVariableEvalSizeEnvironment getVariableEvalSizeEnvironment() {
+ return varEvalSizeEnv;
+ }
+
+ public IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory() {
+ return mergeAggregationExpressionFactory;
+ }
+
+ public PhysicalOptimizationConfig getPhysicalOptimizationConfig() {
+ return physicalOptimizationConfig;
+ }
+
+ @Override
+ public IVariableTypeEnvironment getOutputTypeEnvironment(ILogicalOperator op) {
+ return typeEnvMap.get(op);
+ }
+
+ @Override
+ public void setOutputTypeEnvironment(ILogicalOperator op, IVariableTypeEnvironment env) {
+ typeEnvMap.put(op, env);
+ }
+
+ @Override
+ public IExpressionTypeComputer getExpressionTypeComputer() {
+ return expressionTypeComputer;
+ }
+
+ @Override
+ public INullableTypeComputer getNullableTypeComputer() {
+ return nullableTypeComputer;
+ }
+
+ @Override
+ public void invalidateTypeEnvironmentForOperator(ILogicalOperator op) {
+ typeEnvMap.put(op, null);
+ }
+
+ @Override
+ public void computeAndSetTypeEnvironmentForOperator(ILogicalOperator op) throws AlgebricksException {
+ setOutputTypeEnvironment(op, op.computeOutputTypeEnvironment(this));
+ }
+
+ @Override
+ public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars) {
+ for (Map.Entry<LogicalVariable, FunctionalDependency> me : recordToPrimaryKey.entrySet()) {
+ FunctionalDependency fd = me.getValue();
+ List<LogicalVariable> hd = new ArrayList<LogicalVariable>();
+ for (LogicalVariable v : fd.getHead()) {
+ LogicalVariable v2 = mappedVars.get(v);
+ if (v2 == null) {
+ hd.add(v);
+ } else {
+ hd.add(v2);
+ }
+ }
+ List<LogicalVariable> tl = new ArrayList<LogicalVariable>();
+ for (LogicalVariable v : fd.getTail()) {
+ LogicalVariable v2 = mappedVars.get(v);
+ if (v2 == null) {
+ tl.add(v);
+ } else {
+ tl.add(v2);
+ }
+ }
+ me.setValue(new FunctionalDependency(hd, tl));
+ }
+ }
+
+ @Override
+ public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor() {
+ return prettyPrintVisitor;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
new file mode 100644
index 0000000..17c6900
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+
+public class HeuristicOptimizer {
+
+ public static PhysicalOperatorTag[] hyraxOperators = new PhysicalOperatorTag[] {
+ PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
+ PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
+ PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
+ PhysicalOperatorTag.NESTED_LOOP, PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY,
+ PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, PhysicalOperatorTag.SPLIT, PhysicalOperatorTag.STABLE_SORT,
+ PhysicalOperatorTag.UNION_ALL };
+ public static PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {};
+
+ public static boolean isHyraxOp(PhysicalOperatorTag opTag) {
+ for (PhysicalOperatorTag t : hyraxOperators) {
+ if (t == opTag) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private IOptimizationContext context;
+ private List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites;
+ private List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
+ private ILogicalPlan plan;
+
+ public HeuristicOptimizer(ILogicalPlan plan,
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites,
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites,
+ IOptimizationContext context) {
+ this.plan = plan;
+ this.context = context;
+ this.logicalRewrites = logicalRewrites;
+ this.physicalRewrites = physicalRewrites;
+ }
+
+ public void optimize() throws AlgebricksException {
+ if (plan == null) {
+ return;
+ }
+ if (AlgebricksConfig.DEBUG) {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting logical optimizations.\n");
+ }
+
+ StringBuilder sb = new StringBuilder();
+ PlanPrettyPrinter.printPlan(plan, sb, context.getPrettyPrintVisitor(), 0);
+ AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Logical Plan:\n" + sb.toString());
+ runOptimizationSets(plan, logicalRewrites);
+ computeSchemaBottomUpForPlan(plan);
+ runPhysicalOptimizations(plan, physicalRewrites);
+ StringBuilder sb2 = new StringBuilder();
+ PlanPrettyPrinter.printPlan(plan, sb2, context.getPrettyPrintVisitor(), 0);
+ AlgebricksConfig.ALGEBRICKS_LOGGER.info("Optimized Plan:\n" + sb2.toString());
+ }
+
+ private void runOptimizationSets(ILogicalPlan plan,
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> optimSet) throws AlgebricksException {
+ for (Pair<AbstractRuleController, List<IAlgebraicRewriteRule>> ruleList : optimSet) {
+ for (Mutable<ILogicalOperator> r : plan.getRoots()) {
+ ruleList.first.setContext(context);
+ ruleList.first.rewriteWithRuleCollection(r, ruleList.second);
+ }
+ }
+ }
+
+ private static void computeSchemaBottomUpForPlan(ILogicalPlan p) throws AlgebricksException {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ computeSchemaBottomUpForOp((AbstractLogicalOperator) r.getValue());
+ }
+ }
+
+ private static void computeSchemaBottomUpForOp(AbstractLogicalOperator op) throws AlgebricksException {
+ for (Mutable<ILogicalOperator> i : op.getInputs()) {
+ computeSchemaBottomUpForOp((AbstractLogicalOperator) i.getValue());
+ }
+ if (op.hasNestedPlans()) {
+ AbstractOperatorWithNestedPlans a = (AbstractOperatorWithNestedPlans) op;
+ for (ILogicalPlan p : a.getNestedPlans()) {
+ computeSchemaBottomUpForPlan(p);
+ }
+ }
+ op.recomputeSchema();
+ }
+
+ private void runPhysicalOptimizations(ILogicalPlan plan,
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites)
+ throws AlgebricksException {
+ if (AlgebricksConfig.DEBUG) {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting physical optimizations.\n");
+ }
+ // PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(plan);
+ runOptimizationSets(plan, physicalRewrites);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
new file mode 100644
index 0000000..6bf0b72
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+
+public interface IAlgebraicRewriteRule {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException;
+
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
new file mode 100644
index 0000000..70b7f64
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.INullableTypeComputer;
+
+public interface IOptimizationContextFactory {
+ public IOptimizationContext createOptimizationContext(int varCounter,
+ IExpressionEvalSizeComputer expressionEvalSizeComputer,
+ IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
+ IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
+ PhysicalOptimizationConfig physicalOptimizationConfig);
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
new file mode 100644
index 0000000..47d2266
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import java.util.Properties;
+
+public class PhysicalOptimizationConfig {
+ private static final int MB = 1048576;
+
+ private static final String FRAMESIZE = "FRAMESIZE";
+ private static final String MAX_FRAMES_EXTERNAL_SORT = "MAX_FRAMES_EXTERNAL_SORT";
+ private static final String MAX_FRAMES_EXTERNAL_GROUP_BY = "MAX_FRAMES_EXTERNAL_GROUP_BY";
+ private static final String MAX_FRAMES_LEFT_INPUT_HYBRID_HASH = "MAX_FRAMES_LEFT_INPUT_HYBRID_HASH";
+ private static final String MAX_FRAMES_HYBRID_HASH = "MAX_FRAMES_HYBRID_HASH";
+ private static final String FUDGE_FACTOR = "FUDGE_FACTOR";
+ private static final String MAX_RECORDS_PER_FRAME = "MAX_RECORDS_PER_FRAME";
+
+ private static final String DEFAULT_HASH_GROUP_TABLE_SIZE = "DEFAULT_HASH_GROUP_TABLE_SIZE";
+ private static final String DEFAULT_EXTERNAL_GROUP_TABLE_SIZE = "DEFAULT_EXTERNAL_GROUP_TABLE_SIZE";
+ private static final String DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE = "DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE";
+
+ private Properties properties = new Properties();
+
+ public PhysicalOptimizationConfig() {
+ int frameSize = 32768;
+ setInt(FRAMESIZE, frameSize);
+ setInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 32 * MB) / frameSize));
+ setInt(MAX_FRAMES_EXTERNAL_GROUP_BY, (int) (((long) 32 * MB) / frameSize));
+
+ // use http://www.rsok.com/~jrm/printprimes.html to find prime numbers
+ setInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767);
+ setInt(DEFAULT_EXTERNAL_GROUP_TABLE_SIZE, 10485767);
+ setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, 10485767);
+ }
+
+ public int getFrameSize() {
+ return getInt(FRAMESIZE, 32768);
+ }
+
+ public void setFrameSize(int frameSize) {
+ setInt(FRAMESIZE, frameSize);
+ }
+
+ public double getFudgeFactor() {
+ return getDouble(FUDGE_FACTOR, 1.3);
+ }
+
+ public void setFudgeFactor(double fudgeFactor) {
+ setDouble(FUDGE_FACTOR, fudgeFactor);
+ }
+
+ public int getMaxRecordsPerFrame() {
+ return getInt(MAX_RECORDS_PER_FRAME, 512);
+ }
+
+ public void setMaxRecordsPerFrame(int maxRecords) {
+ setInt(MAX_RECORDS_PER_FRAME, maxRecords);
+ }
+
+ public int getMaxFramesLeftInputHybridHash() {
+ int frameSize = getFrameSize();
+ return getInt(MAX_FRAMES_LEFT_INPUT_HYBRID_HASH, (int) (140L * 1024 * MB / frameSize));
+ }
+
+ public void setMaxFramesLeftInputHybridHash(int frameLimit) {
+ setInt(MAX_FRAMES_LEFT_INPUT_HYBRID_HASH, frameLimit);
+ }
+
+ public int getMaxFramesHybridHash() {
+ int frameSize = getFrameSize();
+ return getInt(MAX_FRAMES_HYBRID_HASH, (int) (64L * MB / frameSize));
+ }
+
+ public void setMaxFramesHybridHash(int frameLimit) {
+ setInt(MAX_FRAMES_HYBRID_HASH, frameLimit);
+ }
+
+ public int getMaxFramesExternalGroupBy() {
+ int frameSize = getFrameSize();
+ return getInt(MAX_FRAMES_EXTERNAL_GROUP_BY, (int) (((long) 256 * MB) / frameSize));
+ }
+
+ public void setMaxFramesExternalGroupBy(int frameLimit) {
+ setInt(MAX_FRAMES_EXTERNAL_GROUP_BY, frameLimit);
+ }
+
+ public int getMaxFramesExternalSort() {
+ int frameSize = getFrameSize();
+ return getInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 32 * MB) / frameSize));
+ }
+
+ public void setMaxFramesExternalSort(int frameLimit) {
+ setInt(MAX_FRAMES_EXTERNAL_SORT, frameLimit);
+ }
+
+ public int getHashGroupByTableSize() {
+ return getInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767);
+ }
+
+ public void setHashGroupByTableSize(int tableSize) {
+ setInt(DEFAULT_HASH_GROUP_TABLE_SIZE, tableSize);
+ }
+
+ public int getExternalGroupByTableSize() {
+ return getInt(DEFAULT_EXTERNAL_GROUP_TABLE_SIZE, 10485767);
+ }
+
+ public void setExternalGroupByTableSize(int tableSize) {
+ setInt(DEFAULT_EXTERNAL_GROUP_TABLE_SIZE, tableSize);
+ }
+
+ public int getInMemHashJoinTableSize() {
+ return getInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, 10485767);
+ }
+
+ public void setInMemHashJoinTableSize(int tableSize) {
+ setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, tableSize);
+ }
+
+ private void setInt(String property, int value) {
+ properties.setProperty(property, Integer.toString(value));
+ }
+
+ private int getInt(String property, int defaultValue) {
+ String value = properties.getProperty(property);
+ if (value == null)
+ return defaultValue;
+ else
+ return Integer.parseInt(value);
+ }
+
+ private void setDouble(String property, double value) {
+ properties.setProperty(property, Double.toString(value));
+ }
+
+ private double getDouble(String property, double defaultValue) {
+ String value = properties.getProperty(property);
+ if (value == null)
+ return defaultValue;
+ else
+ return Double.parseDouble(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/Substitution.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/Substitution.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/Substitution.java
new file mode 100644
index 0000000..3ef4316
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/Substitution.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.utils;
+
+public class Substitution<T> {
+ public T substituted;
+ public T substitutedWith;
+
+ public Substitution(T substituted, T substitutedWith) {
+ this.substituted = substituted;
+ this.substitutedWith = substitutedWith;
+ }
+}