You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:44:17 UTC

[29/51] [partial] incubator-asterixdb git commit: Change folder structure for Java repackage

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
new file mode 100644
index 0000000..05c68f4
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnpartitionedPropertyComputer;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/*
+ *
+ *  unnest $x [[ at $p ]] <- $y
+ *    aggregate $y <- function-call: listify@1(unresolved), Args:[$z]
+ *       Rest 
+ *   
+ * if $y is not used above these operators, 
+ * the plan fragment becomes
+ *
+ *  [[ runningaggregate $p <- tid]]
+ *  assign $x <- $z
+ *       Rest
+ *  
+ *
+ */
+
+public class RemoveRedundantListifyRule implements IAlgebraicRewriteRule {
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        // apply it only at the top of the plan
+        ILogicalOperator op = opRef.getValue();
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+        Set<LogicalVariable> varSet = new HashSet<LogicalVariable>();
+        return applyRuleDown(opRef, varSet, context);
+    }
+
+    private boolean applyRuleDown(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> varSet,
+            IOptimizationContext context) throws AlgebricksException {
+        boolean changed = applies(opRef, varSet, context);
+        changed |= appliesForReverseCase(opRef, varSet, context);
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        VariableUtilities.getUsedVariables(op, varSet);
+        if (op.hasNestedPlans()) {
+            // Variables used by the parent operators should be live at op.
+            Set<LogicalVariable> localLiveVars = new ListSet<LogicalVariable>();
+            VariableUtilities.getLiveVariables(op, localLiveVars);
+            varSet.retainAll(localLiveVars);
+            AbstractOperatorWithNestedPlans aonp = (AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan p : aonp.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                    if (applyRuleDown(r, varSet, context)) {
+                        changed = true;
+                    }
+                    context.addToDontApplySet(this, r.getValue());
+                }
+            }
+        }
+        for (Mutable<ILogicalOperator> i : op.getInputs()) {
+            if (applyRuleDown(i, varSet, context)) {
+                changed = true;
+            }
+            context.addToDontApplySet(this, i.getValue());
+        }
+        return changed;
+    }
+
+    private boolean applies(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> varUsedAbove,
+            IOptimizationContext context) throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.UNNEST) {
+            return false;
+        }
+        UnnestOperator unnest1 = (UnnestOperator) op1;
+        ILogicalExpression expr = unnest1.getExpressionRef().getValue();
+        LogicalVariable unnestedVar;
+        switch (expr.getExpressionTag()) {
+            case VARIABLE:
+                unnestedVar = ((VariableReferenceExpression) expr).getVariableReference();
+                break;
+            case FUNCTION_CALL:
+                if (((AbstractFunctionCallExpression) expr).getFunctionIdentifier() != AsterixBuiltinFunctions.SCAN_COLLECTION) {
+                    return false;
+                }
+                AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) expr;
+                ILogicalExpression functionCallArgExpr = functionCall.getArguments().get(0).getValue();
+                if (functionCallArgExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                    return false;
+                }
+                unnestedVar = ((VariableReferenceExpression) functionCallArgExpr).getVariableReference();
+                break;
+            default:
+                return false;
+        }
+        if (varUsedAbove.contains(unnestedVar)) {
+            return false;
+        }
+
+        Mutable<ILogicalOperator> opRef2 = op1.getInputs().get(0);
+        AbstractLogicalOperator r = (AbstractLogicalOperator) opRef2.getValue();
+
+        if (r.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            return false;
+        }
+        AggregateOperator agg = (AggregateOperator) r;
+        if (agg.getVariables().size() > 1) {
+            return false;
+        }
+        LogicalVariable aggVar = agg.getVariables().get(0);
+        ILogicalExpression aggFun = agg.getExpressions().get(0).getValue();
+        if (!aggVar.equals(unnestedVar)
+                || ((AbstractLogicalExpression) aggFun).getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return false;
+        }
+        AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) aggFun;
+        if (!AsterixBuiltinFunctions.LISTIFY.equals(f.getFunctionIdentifier())) {
+            return false;
+        }
+        if (f.getArguments().size() != 1) {
+            return false;
+        }
+        ILogicalExpression arg0 = f.getArguments().get(0).getValue();
+        if (((AbstractLogicalExpression) arg0).getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+            return false;
+        }
+        LogicalVariable paramVar = ((VariableReferenceExpression) arg0).getVariableReference();
+
+        ArrayList<LogicalVariable> assgnVars = new ArrayList<LogicalVariable>(1);
+        assgnVars.add(unnest1.getVariable());
+        ArrayList<Mutable<ILogicalExpression>> assgnExprs = new ArrayList<Mutable<ILogicalExpression>>(1);
+        assgnExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(paramVar)));
+        AssignOperator assign = new AssignOperator(assgnVars, assgnExprs);
+        assign.getInputs().add(agg.getInputs().get(0));
+        context.computeAndSetTypeEnvironmentForOperator(assign);
+        LogicalVariable posVar = unnest1.getPositionalVariable();
+
+        if (posVar == null) {
+            opRef.setValue(assign);
+        } else {
+            ArrayList<LogicalVariable> raggVars = new ArrayList<LogicalVariable>(1);
+            raggVars.add(posVar);
+            ArrayList<Mutable<ILogicalExpression>> rAggExprs = new ArrayList<Mutable<ILogicalExpression>>(1);
+            StatefulFunctionCallExpression tidFun = new StatefulFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.TID), UnpartitionedPropertyComputer.INSTANCE);
+            rAggExprs.add(new MutableObject<ILogicalExpression>(tidFun));
+            RunningAggregateOperator rAgg = new RunningAggregateOperator(raggVars, rAggExprs);
+            rAgg.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+            opRef.setValue(rAgg);
+            context.computeAndSetTypeEnvironmentForOperator(rAgg);
+        }
+
+        return true;
+    }
+
+    private boolean appliesForReverseCase(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> varUsedAbove,
+            IOptimizationContext context) throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            return false;
+        }
+        AggregateOperator agg = (AggregateOperator) op1;
+        if (agg.getVariables().size() > 1 || agg.getVariables().size() <= 0) {
+            return false;
+        }
+        LogicalVariable aggVar = agg.getVariables().get(0);
+        ILogicalExpression aggFun = agg.getExpressions().get(0).getValue();
+        AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) aggFun;
+        if (!AsterixBuiltinFunctions.LISTIFY.equals(f.getFunctionIdentifier())) {
+            return false;
+        }
+        if (f.getArguments().size() != 1) {
+            return false;
+        }
+        ILogicalExpression arg0 = f.getArguments().get(0).getValue();
+        if (((AbstractLogicalExpression) arg0).getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+            return false;
+        }
+        LogicalVariable aggInputVar = ((VariableReferenceExpression) arg0).getVariableReference();
+
+        if (agg.getInputs().size() == 0) {
+            return false;
+        }
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) agg.getInputs().get(0).getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.UNNEST) {
+            return false;
+        }
+        UnnestOperator unnest = (UnnestOperator) op2;
+        if (unnest.getPositionalVariable() != null) {
+            return false;
+        }
+        if (!unnest.getVariable().equals(aggInputVar)) {
+            return false;
+        }
+        List<LogicalVariable> unnestSource = new ArrayList<LogicalVariable>();
+        VariableUtilities.getUsedVariables(unnest, unnestSource);
+        if (unnestSource.size() > 1 || unnestSource.size() <= 0) {
+            return false;
+        }
+        ArrayList<LogicalVariable> assgnVars = new ArrayList<LogicalVariable>(1);
+        assgnVars.add(aggVar);
+        ArrayList<Mutable<ILogicalExpression>> assgnExprs = new ArrayList<Mutable<ILogicalExpression>>(1);
+        assgnExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(unnestSource.get(0))));
+        AssignOperator assign = new AssignOperator(assgnVars, assgnExprs);
+        assign.getInputs().add(unnest.getInputs().get(0));
+        context.computeAndSetTypeEnvironmentForOperator(assign);
+        opRef.setValue(assign);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
new file mode 100644
index 0000000..2ed2d2e
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantSelectRule.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.optimizer.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.om.base.ABoolean;
+import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule removes redundant select operator, e.g., select operators
+ * in which the condition is TRUE.
+ * Note that the ConstantFoldingRule will evaluate the condition expression
+ * during compile time if it is possible.
+ *
+ * @author yingyib
+ */
+public class RemoveRedundantSelectRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.SELECT) {
+            return false;
+        }
+        SelectOperator select = (SelectOperator) op;
+        ILogicalExpression cond = select.getCondition().getValue();
+        if (alwaysHold(cond)) {
+            opRef.setValue(select.getInputs().get(0).getValue());
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Whether the condition expression always returns true.
+     * 
+     * @param cond
+     * @return true if the condition always holds; false otherwise.
+     */
+    private boolean alwaysHold(ILogicalExpression cond) {
+        if (cond.equals(ConstantExpression.TRUE)) {
+            return true;
+        }
+        if (cond.equals(new ConstantExpression(new AsterixConstantValue(ABoolean.TRUE)))) {
+            return true;
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
new file mode 100644
index 0000000..08daa40
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class RemoveSortInFeedIngestionRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE) {
+            return false;
+        }
+
+        AbstractLogicalOperator insertOp = op;
+        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        boolean isSourceAFeed = false;
+        while (descendantOp != null) {
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+                AqlDataSource dataSource = (AqlDataSource) ((DataSourceScanOperator) descendantOp).getDataSource();
+                if (dataSource.getDatasourceType().equals(AqlDataSourceType.FEED)) {
+                    isSourceAFeed = true;
+                }
+                break;
+            }
+            if (descendantOp.getInputs().isEmpty()) {
+                break;
+            }
+            descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
+        }
+
+        if (isSourceAFeed) {
+            AbstractLogicalOperator prevOp = (AbstractLogicalOperator) insertOp.getInputs().get(0).getValue();
+            if (prevOp.getOperatorTag() == LogicalOperatorTag.ORDER) {
+                insertOp.getInputs().set(0, prevOp.getInputs().get(0));
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
new file mode 100644
index 0000000..d8a8a02
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveUnusedOneToOneEquiJoinRule.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.metadata.declared.DatasetDataSource;
+import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Removes join operators for which all of the following conditions are true:
+ * 1. The live variables of one input branch of the join are not used in the upstream plan
+ * 2. The join is an inner equi join
+ * 3. The join condition only uses variables that correspond to primary keys of the same dataset
+ * Notice that the last condition implies a 1:1 join, i.e., the join does not change the result cardinality.
+ * Joins that satisfy the above conditions may be introduced by other rules
+ * which use surrogate optimizations. Such an optimization aims to reduce data copies and communication costs by
+ * using the primary keys as surrogates for the desired data items. Typically,
+ * such a surrogate-based plan introduces a top-level join to finally resolve
+ * the surrogates to the desired data items.
+ * In case the upstream plan does not require the original data items at all, such a top-level join is unnecessary.
+ * The purpose of this rule is to remove such unnecessary joins.
+ */
+public class RemoveUnusedOneToOneEquiJoinRule implements IAlgebraicRewriteRule {
+
+    private final Set<LogicalVariable> parentsUsedVars = new HashSet<LogicalVariable>();
+    private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+    private final List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
+    private final List<LogicalVariable> pkVars = new ArrayList<LogicalVariable>();
+    private final List<DataSourceScanOperator> dataScans = new ArrayList<DataSourceScanOperator>();
+    private boolean hasRun = false;
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        if (hasRun) {
+            return false;
+        }
+        hasRun = true;
+        if (removeUnusedJoin(opRef)) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    private boolean removeUnusedJoin(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        boolean modified = false;
+
+        usedVars.clear();
+        VariableUtilities.getUsedVariables(op, usedVars);
+        // Propagate used variables from parents downwards.
+        parentsUsedVars.addAll(usedVars);
+
+        int numInputs = op.getInputs().size();
+        for (int i = 0; i < numInputs; i++) {
+            Mutable<ILogicalOperator> childOpRef = op.getInputs().get(i);
+            int unusedJoinBranchIndex = removeJoinFromInputBranch(childOpRef);
+            if (unusedJoinBranchIndex >= 0) {
+                int usedBranchIndex = (unusedJoinBranchIndex == 0) ? 1 : 0;
+                // Remove join at input index i, by hooking up op's input i with 
+                // the join's branch at unusedJoinBranchIndex.
+                AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) childOpRef.getValue();
+                op.getInputs().set(i, joinOp.getInputs().get(usedBranchIndex));
+                modified = true;
+            }
+            // Descend into children.
+            if (removeUnusedJoin(childOpRef)) {
+                modified = true;
+            }
+        }
+        return modified;
+    }
+
+    private int removeJoinFromInputBranch(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN) {
+            return -1;
+        }
+
+        AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) op;
+        // Make sure the join is an equi-join.
+        if (!isEquiJoin(joinOp.getCondition())) {
+            return -1;
+        }
+
+        int unusedJoinBranchIndex = -1;
+        for (int i = 0; i < joinOp.getInputs().size(); i++) {
+            liveVars.clear();
+            VariableUtilities.getLiveVariables(joinOp.getInputs().get(i).getValue(), liveVars);
+            liveVars.retainAll(parentsUsedVars);
+            if (liveVars.isEmpty()) {
+                // None of the live variables from this branch are used by its parents.
+                unusedJoinBranchIndex = i;
+                break;
+            }
+        }
+        if (unusedJoinBranchIndex < 0) {
+            // The variables from both branches are used in the upstream plan. We cannot remove this join.
+            return -1;
+        }
+
+        // Check whether one of the join branches is unused.
+        usedVars.clear();
+        VariableUtilities.getUsedVariables(joinOp, usedVars);
+
+        // Check whether all used variables originate from primary keys of exactly the same dataset.
+        // Collect a list of datascans whose primary key variables are used in the join condition.
+        gatherProducingDataScans(opRef, usedVars, dataScans);
+
+        // Check that all datascans scan the same dataset, and that the join condition
+        // only used primary key variables of those datascans.
+        for (int i = 0; i < dataScans.size(); i++) {
+            if (i > 0) {
+                DatasetDataSource prevAqlDataSource = (DatasetDataSource) dataScans.get(i - 1).getDataSource();
+                DatasetDataSource currAqlDataSource = (DatasetDataSource) dataScans.get(i).getDataSource();
+                if (!prevAqlDataSource.getDataset().equals(currAqlDataSource.getDataset())) {
+                    return -1;
+                }
+            }
+            // Remove from the used variables all the primary key vars of this dataset.
+            fillPKVars(dataScans.get(i), pkVars);
+            usedVars.removeAll(pkVars);
+        }
+        if (!usedVars.isEmpty()) {
+            // The join condition also uses some other variables that are not primary
+            // keys from datasource scans of the same dataset.
+            return -1;
+        }
+        return unusedJoinBranchIndex;
+    }
+
+    private void gatherProducingDataScans(Mutable<ILogicalOperator> opRef, List<LogicalVariable> joinUsedVars,
+            List<DataSourceScanOperator> dataScans) {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
+            for (Mutable<ILogicalOperator> inputOp : op.getInputs()) {
+                gatherProducingDataScans(inputOp, joinUsedVars, dataScans);
+            }
+            return;
+        }
+        DataSourceScanOperator dataScan = (DataSourceScanOperator) op;
+        fillPKVars(dataScan, pkVars);
+        // Check if join uses all PK vars.
+        if (joinUsedVars.containsAll(pkVars)) {
+            dataScans.add(dataScan);
+        }
+    }
+
+    private void fillPKVars(DataSourceScanOperator dataScan, List<LogicalVariable> pkVars) {
+        pkVars.clear();
+        DatasetDataSource datasetDataSource = (DatasetDataSource) dataScan.getDataSource();
+        pkVars.clear();
+        if (datasetDataSource.getDataset().getDatasetDetails() instanceof InternalDatasetDetails) {
+            int numPKs = DatasetUtils.getPartitioningKeys(datasetDataSource.getDataset()).size();
+            for (int i = 0; i < numPKs; i++) {
+                pkVars.add(dataScan.getVariables().get(i));
+            }
+        }
+    }
+
+    private boolean isEquiJoin(Mutable<ILogicalExpression> conditionExpr) {
+        AbstractLogicalExpression expr = (AbstractLogicalExpression) conditionExpr.getValue();
+        if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+            FunctionIdentifier funcIdent = funcExpr.getFunctionIdentifier();
+            if (funcIdent != AlgebricksBuiltinFunctions.AND && funcIdent != AlgebricksBuiltinFunctions.EQ) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
new file mode 100644
index 0000000..8de761a
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.algebra.operators.CommitOperator;
+import edu.uci.ics.asterix.algebra.operators.physical.CommitPOperator;
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.DatasetDataSource;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class ReplaceSinkOpWithCommitOpRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.SINK) {
+            return false;
+        }
+        SinkOperator sinkOperator = (SinkOperator) op;
+
+        List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
+        int datasetId = 0;
+        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) sinkOperator.getInputs().get(0).getValue();
+        while (descendantOp != null) {
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.INDEX_INSERT_DELETE) {
+                IndexInsertDeleteOperator indexInsertDeleteOperator = (IndexInsertDeleteOperator) descendantOp;
+                if (!indexInsertDeleteOperator.isBulkload()) {
+                    primaryKeyExprs = indexInsertDeleteOperator.getPrimaryKeyExpressions();
+                    datasetId = ((DatasetDataSource) indexInsertDeleteOperator.getDataSourceIndex().getDataSource())
+                            .getDataset().getDatasetId();
+                    break;
+                }
+            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE) {
+                InsertDeleteOperator insertDeleteOperator = (InsertDeleteOperator) descendantOp;
+                if (!insertDeleteOperator.isBulkload()) {
+                    primaryKeyExprs = insertDeleteOperator.getPrimaryKeyExpressions();
+                    datasetId = ((DatasetDataSource) insertDeleteOperator.getDataSource()).getDataset().getDatasetId();
+                    break;
+                }
+            }
+            if (descendantOp.getInputs().size() < 1) {
+                break;
+            }
+            descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
+        }
+
+        if (primaryKeyExprs == null) {
+            return false;
+        }
+
+        //copy primaryKeyExprs
+        List<LogicalVariable> primaryKeyLogicalVars = new ArrayList<LogicalVariable>();
+        for (Mutable<ILogicalExpression> expr : primaryKeyExprs) {
+            VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr.getValue();
+            primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId()));
+        }
+
+        //get JobId(TransactorId)
+        AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+        JobId jobId = mp.getJobId();
+
+        //create the logical and physical operator
+        CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars);
+        CommitPOperator commitPOperator = new CommitPOperator(jobId, datasetId, primaryKeyLogicalVars);
+        commitOperator.setPhysicalOperator(commitPOperator);
+
+        //create ExtensionOperator and put the commitOperator in it.
+        ExtensionOperator extensionOperator = new ExtensionOperator(commitOperator);
+        extensionOperator.setPhysicalOperator(commitPOperator);
+
+        //update plan link
+        extensionOperator.getInputs().add(sinkOperator.getInputs().get(0));
+        context.computeAndSetTypeEnvironmentForOperator(extensionOperator);
+        opRef.setValue(extensionOperator);
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
new file mode 100644
index 0000000..65c9da8
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -0,0 +1,300 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.algebra.operators.physical.BTreeSearchPOperator;
+import edu.uci.ics.asterix.algebra.operators.physical.InvertedIndexPOperator;
+import edu.uci.ics.asterix.algebra.operators.physical.RTreeSearchPOperator;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.optimizer.rules.am.AccessMethodJobGenParams;
+import edu.uci.ics.asterix.optimizer.rules.am.BTreeJobGenParams;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.rewriter.util.JoinUtils;
+
+public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+
+        computeDefaultPhysicalOp(op, context);
+        context.addToDontApplySet(this, op);
+        return true;
+    }
+
+    private static void setPhysicalOperators(ILogicalPlan plan, IOptimizationContext context)
+            throws AlgebricksException {
+        for (Mutable<ILogicalOperator> root : plan.getRoots()) {
+            computeDefaultPhysicalOp((AbstractLogicalOperator) root.getValue(), context);
+        }
+    }
+
+    private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
+        if (op.getOperatorTag().equals(LogicalOperatorTag.GROUP)) {
+            GroupByOperator gby = (GroupByOperator) op;
+            if (gby.getNestedPlans().size() == 1) {
+                ILogicalPlan p0 = gby.getNestedPlans().get(0);
+                if (p0.getRoots().size() == 1) {
+                    Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+                    if (((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().equals(
+                            LogicalOperatorTag.AGGREGATE)) {
+                        AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+                        boolean serializable = true;
+                        for (Mutable<ILogicalExpression> exprRef : aggOp.getExpressions()) {
+                            AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) exprRef.getValue();
+                            if (!AsterixBuiltinFunctions.isAggregateFunctionSerializable(expr.getFunctionIdentifier())) {
+                                serializable = false;
+                                break;
+                            }
+                        }
+
+                        if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
+                                .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
+                            boolean setToExternalGby = false;
+                            if (serializable) {
+                                // if serializable, use external group-by
+                                // now check whether the serialized version aggregation function has corresponding intermediate agg
+                                boolean hasIntermediateAgg = true;
+                                IMergeAggregationExpressionFactory mergeAggregationExpressionFactory = context
+                                        .getMergeAggregationExpressionFactory();
+                                List<LogicalVariable> originalVariables = aggOp.getVariables();
+                                List<Mutable<ILogicalExpression>> aggExprs = aggOp.getExpressions();
+                                int aggNum = aggExprs.size();
+                                for (int i = 0; i < aggNum; i++) {
+                                    AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) aggExprs
+                                            .get(i).getValue();
+                                    AggregateFunctionCallExpression serialAggExpr = AsterixBuiltinFunctions
+                                            .makeSerializableAggregateFunctionExpression(expr.getFunctionIdentifier(),
+                                                    expr.getArguments());
+                                    if (mergeAggregationExpressionFactory.createMergeAggregation(
+                                            originalVariables.get(i), serialAggExpr, context) == null) {
+                                        hasIntermediateAgg = false;
+                                        break;
+                                    }
+                                }
+
+                                if (hasIntermediateAgg) {
+                                    for (int i = 0; i < aggNum; i++) {
+                                        AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) aggExprs
+                                                .get(i).getValue();
+                                        AggregateFunctionCallExpression serialAggExpr = AsterixBuiltinFunctions
+                                                .makeSerializableAggregateFunctionExpression(
+                                                        expr.getFunctionIdentifier(), expr.getArguments());
+                                        aggOp.getExpressions().get(i).setValue(serialAggExpr);
+                                    }
+                                    ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
+                                            gby.getGroupByList(),
+                                            physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
+                                            physicalOptimizationConfig.getExternalGroupByTableSize());
+                                    generateMergeAggregationExpressions(gby, context);
+                                    op.setPhysicalOperator(externalGby);
+                                    setToExternalGby = true;
+                                }
+                            }
+
+                            if (!setToExternalGby) {
+                                // if not serializable or no intermediate agg, use pre-clustered group-by
+                                List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
+                                List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
+                                for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
+                                    ILogicalExpression expr = p.second.getValue();
+                                    if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                                        VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+                                        columnList.add(varRef.getVariableReference());
+                                    }
+                                }
+                                op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList));
+                            }
+                        }
+                    } else if (((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().equals(
+                            LogicalOperatorTag.RUNNINGAGGREGATE)) {
+                        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
+                        List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
+                        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
+                            ILogicalExpression expr = p.second.getValue();
+                            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                                VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+                                columnList.add(varRef.getVariableReference());
+                            }
+                        }
+                        op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList));
+                    } else {
+                        throw new AlgebricksException("Unsupported nested operator within a group-by: "
+                                + ((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().name());
+                    }
+                }
+            }
+        }
+        if (op.getPhysicalOperator() == null) {
+            switch (op.getOperatorTag()) {
+                case INNERJOIN: {
+                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
+                    break;
+                }
+                case LEFTOUTERJOIN: {
+                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context);
+                    break;
+                }
+                case UNNEST_MAP: {
+                    UnnestMapOperator unnestMap = (UnnestMapOperator) op;
+                    ILogicalExpression unnestExpr = unnestMap.getExpressionRef().getValue();
+                    if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                        AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+                        FunctionIdentifier fid = f.getFunctionIdentifier();
+                        if (!fid.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+                            throw new IllegalStateException();
+                        }
+                        AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+                        jobGenParams.readFromFuncArgs(f.getArguments());
+                        AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+                        AqlSourceId dataSourceId = new AqlSourceId(jobGenParams.getDataverseName(),
+                                jobGenParams.getDatasetName());
+                        IDataSourceIndex<String, AqlSourceId> dsi = mp.findDataSourceIndex(jobGenParams.getIndexName(),
+                                dataSourceId);
+                        if (dsi == null) {
+                            throw new AlgebricksException("Could not find index " + jobGenParams.getIndexName()
+                                    + " for dataset " + dataSourceId);
+                        }
+                        IndexType indexType = jobGenParams.getIndexType();
+                        boolean requiresBroadcast = jobGenParams.getRequiresBroadcast();
+                        switch (indexType) {
+                            case BTREE: {
+                                BTreeJobGenParams btreeJobGenParams = new BTreeJobGenParams();
+                                btreeJobGenParams.readFromFuncArgs(f.getArguments());
+                                op.setPhysicalOperator(new BTreeSearchPOperator(dsi, requiresBroadcast,
+                                        btreeJobGenParams.isPrimaryIndex(), btreeJobGenParams.isEqCondition(),
+                                        btreeJobGenParams.getLowKeyVarList(), btreeJobGenParams.getHighKeyVarList()));
+                                break;
+                            }
+                            case RTREE: {
+                                op.setPhysicalOperator(new RTreeSearchPOperator(dsi, requiresBroadcast));
+                                break;
+                            }
+                            case SINGLE_PARTITION_WORD_INVIX:
+                            case SINGLE_PARTITION_NGRAM_INVIX: {
+                                op.setPhysicalOperator(new InvertedIndexPOperator(dsi, requiresBroadcast, false));
+                                break;
+                            }
+                            case LENGTH_PARTITIONED_WORD_INVIX:
+                            case LENGTH_PARTITIONED_NGRAM_INVIX: {
+                                op.setPhysicalOperator(new InvertedIndexPOperator(dsi, requiresBroadcast, true));
+                                break;
+                            }
+                            default: {
+                                throw new NotImplementedException(indexType + " indexes are not implemented.");
+                            }
+                        }
+                    }
+                    break;
+                }
+            }
+        }
+        if (op.hasNestedPlans()) {
+            AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan p : nested.getNestedPlans()) {
+                setPhysicalOperators(p, context);
+            }
+        }
+        for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
+            computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), context);
+        }
+    }
+
+    private static void generateMergeAggregationExpressions(GroupByOperator gby, IOptimizationContext context)
+            throws AlgebricksException {
+        if (gby.getNestedPlans().size() != 1) {
+            throw new AlgebricksException(
+                    "External group-by currently works only for one nested plan with one root containing"
+                            + "an aggregate and a nested-tuple-source.");
+        }
+        ILogicalPlan p0 = gby.getNestedPlans().get(0);
+        if (p0.getRoots().size() != 1) {
+            throw new AlgebricksException(
+                    "External group-by currently works only for one nested plan with one root containing"
+                            + "an aggregate and a nested-tuple-source.");
+        }
+        IMergeAggregationExpressionFactory mergeAggregationExpressionFactory = context
+                .getMergeAggregationExpressionFactory();
+        Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+        AbstractLogicalOperator r0Logical = (AbstractLogicalOperator) r0.getValue();
+        if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            throw new AlgebricksException("The merge aggregation expression generation should not process a "
+                    + r0Logical.getOperatorTag() + " operator.");
+        }
+        AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+        List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
+        List<LogicalVariable> aggProducedVars = aggOp.getVariables();
+        int n = aggOp.getExpressions().size();
+        List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<Mutable<ILogicalExpression>>();
+        for (int i = 0; i < n; i++) {
+            ILogicalExpression mergeExpr = mergeAggregationExpressionFactory.createMergeAggregation(
+                    aggProducedVars.get(i), aggFuncRefs.get(i).getValue(), context);
+            if (mergeExpr == null) {
+                throw new AlgebricksException("The aggregation function " + aggFuncRefs.get(i).getValue()
+                        + " does not have a registered intermediate aggregation function.");
+            }
+            mergeExpressionRefs.add(new MutableObject<ILogicalExpression>(mergeExpr));
+        }
+        aggOp.setMergeExpressions(mergeExpressionRefs);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetClosedRecordConstructorsRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetClosedRecordConstructorsRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetClosedRecordConstructorsRule.java
new file mode 100644
index 0000000..4b9a947
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetClosedRecordConstructorsRule.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
+import edu.uci.ics.asterix.om.types.AOrderedListType;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.AbstractConstVarFunVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * open-record-constructor() becomes closed-record-constructor() if all the
+ * branches below lead to dataset scans for closed record types
+ */
+
+public class SetClosedRecordConstructorsRule implements IAlgebraicRewriteRule {
+
+    private SettingClosedRecordVisitor recordVisitor;
+
+    public SetClosedRecordConstructorsRule() {
+        this.recordVisitor = new SettingClosedRecordVisitor();
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext ctx) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (ctx.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+        ctx.addToDontApplySet(this, op);
+        this.recordVisitor.setOptimizationContext(ctx, op.computeInputTypeEnvironment(ctx));
+        boolean res = op.acceptExpressionTransform(recordVisitor);
+        if (res) {
+            ctx.computeAndSetTypeEnvironmentForOperator(op);
+        }
+        return res;
+    }
+
+    private static class SettingClosedRecordVisitor extends AbstractConstVarFunVisitor<ClosedDataInfo, Void> implements
+            ILogicalExpressionReferenceTransform {
+
+        private IOptimizationContext context;
+        private IVariableTypeEnvironment env;
+
+        public void setOptimizationContext(IOptimizationContext context, IVariableTypeEnvironment env) {
+            this.context = context;
+            this.env = env;
+        }
+
+        @Override
+        public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+            AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
+            ClosedDataInfo cdi = expr.accept(this, null);
+            if (cdi.expressionChanged) {
+                exprRef.setValue(cdi.expression);
+            }
+            return cdi.expressionChanged;
+        }
+
+        @Override
+        public ClosedDataInfo visitConstantExpression(ConstantExpression expr, Void arg) throws AlgebricksException {
+            return new ClosedDataInfo(false, hasClosedType(expr), expr);
+        }
+
+        @Override
+        public ClosedDataInfo visitFunctionCallExpression(AbstractFunctionCallExpression expr, Void arg)
+                throws AlgebricksException {
+            boolean allClosed = true;
+            boolean changed = false;
+            if (expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
+                ARecordType reqType = (ARecordType) TypeComputerUtilities.getRequiredType(expr);
+                if (reqType == null || !reqType.isOpen()) {
+                    int n = expr.getArguments().size();
+                    if (n % 2 > 0) {
+                        throw new AlgebricksException(
+                                "Record constructor expected to have an even number of arguments: " + expr);
+                    }
+                    for (int i = 0; i < n / 2; i++) {
+                        ILogicalExpression a0 = expr.getArguments().get(2 * i).getValue();
+                        if (a0.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+                            allClosed = false;
+                        }
+                        Mutable<ILogicalExpression> aRef1 = expr.getArguments().get(2 * i + 1);
+                        ILogicalExpression a1 = aRef1.getValue();
+                        ClosedDataInfo cdi = a1.accept(this, arg);
+                        if (!cdi.dataIsClosed) {
+                            allClosed = false;
+                        }
+                        if (cdi.expressionChanged) {
+                            aRef1.setValue(cdi.expression);
+                            changed = true;
+                        }
+                    }
+                    if (allClosed) {
+                        expr.setFunctionInfo(FunctionUtils
+                                .getFunctionInfo(AsterixBuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR));
+                        GlobalConfig.ASTERIX_LOGGER.finest("Switching to CLOSED record constructor in " + expr + ".\n");
+                        changed = true;
+                    }
+                }
+            } else {
+                boolean rewrite = true;
+                if (expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.ORDERED_LIST_CONSTRUCTOR)
+                        || (expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.UNORDERED_LIST_CONSTRUCTOR))) {
+                    IAType reqType = TypeComputerUtilities.getRequiredType(expr);
+                    if (reqType == null) {
+                        rewrite = false;
+                    }
+                }
+                if (rewrite) {
+                    for (Mutable<ILogicalExpression> e : expr.getArguments()) {
+                        ILogicalExpression ale = e.getValue();
+                        ClosedDataInfo cdi = ale.accept(this, arg);
+                        if (!cdi.dataIsClosed) {
+                            allClosed = false;
+                        }
+                        if (cdi.expressionChanged) {
+                            e.setValue(cdi.expression);
+                            changed = true;
+                        }
+                    }
+                }
+            }
+            return new ClosedDataInfo(changed, hasClosedType(expr), expr);
+        }
+
+        @Override
+        public ClosedDataInfo visitVariableReferenceExpression(VariableReferenceExpression expr, Void arg)
+                throws AlgebricksException {
+            Object varType = env.getVarType(expr.getVariableReference());
+            if (varType == null) {
+                throw new AlgebricksException("Could not infer type for variable '" + expr.getVariableReference()
+                        + "'.");
+            }
+            boolean dataIsClosed = isClosedRec((IAType) varType);
+            return new ClosedDataInfo(false, dataIsClosed, expr);
+        }
+
+        private boolean hasClosedType(ILogicalExpression expr) throws AlgebricksException {
+            IAType t = (IAType) context.getExpressionTypeComputer().getType(expr, context.getMetadataProvider(), env);
+            return isClosedRec(t);
+        }
+
+        private static boolean isClosedRec(IAType t) throws AlgebricksException {
+            switch (t.getTypeTag()) {
+                case ANY: {
+                    return false;
+                }
+                case CIRCLE:
+                case INT8:
+                case INT16:
+                case INT32:
+                case INT64:
+                case BINARY:
+                case BITARRAY:
+                case FLOAT:
+                case DOUBLE:
+                case STRING:
+                case LINE:
+                case NULL:
+                case BOOLEAN:
+                case DATETIME:
+                case DATE:
+                case TIME:
+                case UUID:
+                case DURATION:
+                case YEARMONTHDURATION:
+                case DAYTIMEDURATION:
+                case INTERVAL:
+                case POINT:
+                case POINT3D:
+                case POLYGON:
+                case RECTANGLE:
+                case SHORTWITHOUTTYPEINFO: {
+                    return true;
+                }
+                case RECORD: {
+                    return !((ARecordType) t).isOpen();
+                }
+                case UNION: {
+                    AUnionType ut = (AUnionType) t;
+                    for (IAType t2 : ut.getUnionList()) {
+                        if (!isClosedRec(t2)) {
+                            return false;
+                        }
+                    }
+                    return true;
+                }
+                case ORDEREDLIST: {
+                    AOrderedListType ol = (AOrderedListType) t;
+                    return isClosedRec(ol.getItemType());
+                }
+                case UNORDEREDLIST: {
+                    AUnorderedListType ul = (AUnorderedListType) t;
+                    return isClosedRec(ul.getItemType());
+                }
+                default: {
+                    throw new IllegalStateException("Closed type analysis not implemented for type " + t);
+                }
+            }
+        }
+    }
+
+    private static class ClosedDataInfo {
+        boolean expressionChanged;
+        boolean dataIsClosed;
+        ILogicalExpression expression;
+
+        public ClosedDataInfo(boolean expressionChanged, boolean dataIsClosed, ILogicalExpression expression) {
+            this.expressionChanged = expressionChanged;
+            this.dataIsClosed = dataIsClosed;
+            this.expression = expression;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SimilarityCheckRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SimilarityCheckRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SimilarityCheckRule.java
new file mode 100644
index 0000000..2d03d69
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SimilarityCheckRule.java
@@ -0,0 +1,336 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AFloat;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Looks for a select operator, containing a condition:
+ * similarity-function GE/GT/LE/LE constant/variable
+ * Rewrites the select condition (and possibly the assign expr) with the equivalent similarity-check function.
+ */
+public class SimilarityCheckRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        // Look for select.
+        if (op.getOperatorTag() != LogicalOperatorTag.SELECT) {
+            return false;
+        }
+        SelectOperator select = (SelectOperator) op;
+        Mutable<ILogicalExpression> condExpr = select.getCondition();
+
+        // Gather assigns below this select.
+        List<AssignOperator> assigns = new ArrayList<AssignOperator>();
+        AbstractLogicalOperator childOp = (AbstractLogicalOperator) select.getInputs().get(0).getValue();
+        // Skip selects.
+        while (childOp.getOperatorTag() == LogicalOperatorTag.SELECT) {
+            childOp = (AbstractLogicalOperator) childOp.getInputs().get(0).getValue();
+        }
+        while (childOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+            assigns.add((AssignOperator) childOp);
+            childOp = (AbstractLogicalOperator) childOp.getInputs().get(0).getValue();
+        }
+        return replaceSelectConditionExprs(condExpr, assigns, context);
+    }
+
+    private boolean replaceSelectConditionExprs(Mutable<ILogicalExpression> expRef, List<AssignOperator> assigns,
+            IOptimizationContext context) throws AlgebricksException {
+        ILogicalExpression expr = expRef.getValue();
+        if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return false;
+        }
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+        FunctionIdentifier funcIdent = funcExpr.getFunctionIdentifier();
+        // Recursively traverse conjuncts.
+        // TODO: Ignore disjuncts for now, because some replacements may be invalid.
+        // For example, if the result of the similarity function is used somewhere upstream,
+        // then we may still need the true similarity value even if the GE/GT/LE/LE comparison returns false.
+        if (funcIdent == AlgebricksBuiltinFunctions.AND) {
+            boolean found = true;
+            for (int i = 0; i < funcExpr.getArguments().size(); ++i) {
+                found = found && replaceSelectConditionExprs(funcExpr.getArguments().get(i), assigns, context);
+            }
+            return found;
+        }
+
+        // Look for GE/GT/LE/LT.
+        if (funcIdent != AlgebricksBuiltinFunctions.GE && funcIdent != AlgebricksBuiltinFunctions.GT
+                && funcIdent != AlgebricksBuiltinFunctions.LE && funcIdent != AlgebricksBuiltinFunctions.LT) {
+            return false;
+        }
+
+        // One arg should be a function call or a variable, the other a constant.
+        AsterixConstantValue constVal = null;
+        ILogicalExpression nonConstExpr = null;
+        ILogicalExpression arg1 = funcExpr.getArguments().get(0).getValue();
+        ILogicalExpression arg2 = funcExpr.getArguments().get(1).getValue();
+        // Normalized GE/GT/LE/LT as if constant was on the right hand side.
+        FunctionIdentifier normFuncIdent = null;
+        // One of the args must be a constant.
+        if (arg1.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+            ConstantExpression constExpr = (ConstantExpression) arg1;
+            constVal = (AsterixConstantValue) constExpr.getValue();
+            nonConstExpr = arg2;
+            // Get func ident as if swapping lhs and rhs.
+            normFuncIdent = getLhsAndRhsSwappedFuncIdent(funcIdent);
+        } else if (arg2.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+            ConstantExpression constExpr = (ConstantExpression) arg2;
+            constVal = (AsterixConstantValue) constExpr.getValue();
+            nonConstExpr = arg1;
+            // Constant is already on rhs, so nothing to be done for normalizedFuncIdent.
+            normFuncIdent = funcIdent;
+        } else {
+            return false;
+        }
+
+        // The other arg is a function call. We can directly replace the select condition with an equivalent similarity check expression.
+        if (nonConstExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            return replaceWithFunctionCallArg(expRef, normFuncIdent, constVal,
+                    (AbstractFunctionCallExpression) nonConstExpr);
+        }
+        // The other arg ist a variable. We may have to introduce an assign operator that assigns the result of a similarity-check function to a variable.
+        if (nonConstExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+            return replaceWithVariableArg(expRef, normFuncIdent, constVal, (VariableReferenceExpression) nonConstExpr,
+                    assigns, context);
+        }
+        return false;
+    }
+
+    private boolean replaceWithVariableArg(Mutable<ILogicalExpression> expRef, FunctionIdentifier normFuncIdent,
+            AsterixConstantValue constVal, VariableReferenceExpression varRefExpr, List<AssignOperator> assigns,
+            IOptimizationContext context) throws AlgebricksException {
+
+        // Find variable in assigns to determine its originating function.
+        LogicalVariable var = varRefExpr.getVariableReference();
+        Mutable<ILogicalExpression> simFuncExprRef = null;
+        ScalarFunctionCallExpression simCheckFuncExpr = null;
+        AssignOperator matchingAssign = null;
+        for (int i = 0; i < assigns.size(); i++) {
+            AssignOperator assign = assigns.get(i);
+            for (int j = 0; j < assign.getVariables().size(); j++) {
+                // Check if variables match.
+                if (var != assign.getVariables().get(j)) {
+                    continue;
+                }
+                // Check if corresponding expr is a function call.
+                if (assign.getExpressions().get(j).getValue().getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                    continue;
+                }
+                simFuncExprRef = assign.getExpressions().get(j);
+                // Analyze function expression and get equivalent similarity check function.
+                simCheckFuncExpr = getSimilarityCheckExpr(normFuncIdent, constVal,
+                        (AbstractFunctionCallExpression) simFuncExprRef.getValue());
+                matchingAssign = assign;
+                break;
+            }
+            if (simCheckFuncExpr != null) {
+                break;
+            }
+        }
+
+        // Only non-null if we found that varRefExpr refers to an optimizable similarity function call.
+        if (simCheckFuncExpr != null) {
+            // Create a new assign under matchingAssign which assigns the result of our similarity-check function to a variable.
+            LogicalVariable newVar = context.newVar();
+            AssignOperator newAssign = new AssignOperator(newVar, new MutableObject<ILogicalExpression>(
+                    simCheckFuncExpr));
+            // Hook up inputs.
+            newAssign.getInputs()
+                    .add(new MutableObject<ILogicalOperator>(matchingAssign.getInputs().get(0).getValue()));
+            matchingAssign.getInputs().get(0).setValue(newAssign);
+
+            // Replace select condition with a get-item on newVar.
+            List<Mutable<ILogicalExpression>> selectGetItemArgs = new ArrayList<Mutable<ILogicalExpression>>();
+            // First arg is a variable reference expr on newVar.
+            selectGetItemArgs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(newVar)));
+            // Second arg is the item index to be accessed, here 0.
+            selectGetItemArgs.add(new MutableObject<ILogicalExpression>(new ConstantExpression(
+                    new AsterixConstantValue(new AInt32(0)))));
+            ILogicalExpression selectGetItemExpr = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.GET_ITEM), selectGetItemArgs);
+            // Replace the old similarity function call with the new getItemExpr.
+            expRef.setValue(selectGetItemExpr);
+
+            // Replace expr corresponding to original variable in the original assign with a get-item on newVar.
+            List<Mutable<ILogicalExpression>> assignGetItemArgs = new ArrayList<Mutable<ILogicalExpression>>();
+            // First arg is a variable reference expr on newVar.
+            assignGetItemArgs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(newVar)));
+            // Second arg is the item index to be accessed, here 1.
+            assignGetItemArgs.add(new MutableObject<ILogicalExpression>(new ConstantExpression(
+                    new AsterixConstantValue(new AInt32(1)))));
+            ILogicalExpression assignGetItemExpr = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.GET_ITEM), assignGetItemArgs);
+            // Replace the original assign expr with the get-item expr.
+            simFuncExprRef.setValue(assignGetItemExpr);
+
+            context.computeAndSetTypeEnvironmentForOperator(newAssign);
+            context.computeAndSetTypeEnvironmentForOperator(matchingAssign);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    private boolean replaceWithFunctionCallArg(Mutable<ILogicalExpression> expRef, FunctionIdentifier normFuncIdent,
+            AsterixConstantValue constVal, AbstractFunctionCallExpression funcExpr) throws AlgebricksException {
+        // Analyze func expr to see if it is an optimizable similarity function.
+        ScalarFunctionCallExpression simCheckFuncExpr = getSimilarityCheckExpr(normFuncIdent, constVal, funcExpr);
+
+        // Replace the expr in the select condition.
+        if (simCheckFuncExpr != null) {
+            // Get item 0 from var.
+            List<Mutable<ILogicalExpression>> getItemArgs = new ArrayList<Mutable<ILogicalExpression>>();
+            // First arg is the similarity-check function call.
+            getItemArgs.add(new MutableObject<ILogicalExpression>(simCheckFuncExpr));
+            // Second arg is the item index to be accessed.
+            getItemArgs.add(new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(
+                    new AInt32(0)))));
+            ILogicalExpression getItemExpr = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.GET_ITEM), getItemArgs);
+            // Replace the old similarity function call with the new getItemExpr.
+            expRef.setValue(getItemExpr);
+            return true;
+        }
+
+        return false;
+    }
+
+    private ScalarFunctionCallExpression getSimilarityCheckExpr(FunctionIdentifier normFuncIdent,
+            AsterixConstantValue constVal, AbstractFunctionCallExpression funcExpr) throws AlgebricksException {
+        // Remember args from original similarity function to add them to the similarity-check function later.
+        ArrayList<Mutable<ILogicalExpression>> similarityArgs = null;
+        ScalarFunctionCallExpression simCheckFuncExpr = null;
+        // Look for jaccard function call, and GE or GT.
+        if (funcExpr.getFunctionIdentifier() == AsterixBuiltinFunctions.SIMILARITY_JACCARD) {
+            IAObject jaccThresh;
+            if (normFuncIdent == AlgebricksBuiltinFunctions.GE) {
+                if (constVal.getObject() instanceof AFloat) {
+                    jaccThresh = constVal.getObject();
+                } else {
+                    jaccThresh = new AFloat((float) ((ADouble) constVal.getObject()).getDoubleValue());
+                }
+            } else if (normFuncIdent == AlgebricksBuiltinFunctions.GT) {
+                float threshVal = 0.0f;
+                if (constVal.getObject() instanceof AFloat) {
+                    threshVal = ((AFloat) constVal.getObject()).getFloatValue();
+                } else {
+                    threshVal = (float) ((ADouble) constVal.getObject()).getDoubleValue();
+                }
+                float f = threshVal + Float.MIN_VALUE;
+                if (f > 1.0f)
+                    f = 1.0f;
+                jaccThresh = new AFloat(f);
+            } else {
+                return null;
+            }
+            similarityArgs = new ArrayList<Mutable<ILogicalExpression>>();
+            similarityArgs.addAll(funcExpr.getArguments());
+            similarityArgs.add(new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(
+                    jaccThresh))));
+            simCheckFuncExpr = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.SIMILARITY_JACCARD_CHECK), similarityArgs);
+        }
+
+        // Look for edit-distance function call, and LE or LT.
+        if (funcExpr.getFunctionIdentifier() == AsterixBuiltinFunctions.EDIT_DISTANCE) {
+            AInt32 aInt = new AInt32(0);
+            try {
+                aInt = (AInt32) ATypeHierarchy.convertNumericTypeObject(constVal.getObject(), ATypeTag.INT32);
+            } catch (AsterixException e) {
+                throw new AlgebricksException(e);
+            }
+
+            AInt32 edThresh;
+            if (normFuncIdent == AlgebricksBuiltinFunctions.LE) {
+                edThresh = aInt;
+            } else if (normFuncIdent == AlgebricksBuiltinFunctions.LT) {
+                int ed = aInt.getIntegerValue() - 1;
+                if (ed < 0)
+                    ed = 0;
+                edThresh = new AInt32(ed);
+            } else {
+                return null;
+            }
+            similarityArgs = new ArrayList<Mutable<ILogicalExpression>>();
+            similarityArgs.addAll(funcExpr.getArguments());
+            similarityArgs.add(new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(
+                    edThresh))));
+            simCheckFuncExpr = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.EDIT_DISTANCE_CHECK), similarityArgs);
+        }
+        // Preserve all annotations.
+        if (simCheckFuncExpr != null) {
+            simCheckFuncExpr.getAnnotations().putAll(funcExpr.getAnnotations());
+        }
+        return simCheckFuncExpr;
+    }
+
+    private FunctionIdentifier getLhsAndRhsSwappedFuncIdent(FunctionIdentifier oldFuncIdent) {
+        if (oldFuncIdent == AlgebricksBuiltinFunctions.GE) {
+            return AlgebricksBuiltinFunctions.LE;
+        }
+        if (oldFuncIdent == AlgebricksBuiltinFunctions.GT) {
+            return AlgebricksBuiltinFunctions.LT;
+        }
+        if (oldFuncIdent == AlgebricksBuiltinFunctions.LE) {
+            return AlgebricksBuiltinFunctions.GE;
+        }
+        if (oldFuncIdent == AlgebricksBuiltinFunctions.LT) {
+            return AlgebricksBuiltinFunctions.GT;
+        }
+        throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+}