You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:44:20 UTC

[32/51] [partial] incubator-asterixdb git commit: Change folder structure for Java repackage

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InlineUnnestFunctionRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InlineUnnestFunctionRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InlineUnnestFunctionRule.java
new file mode 100644
index 0000000..418e114
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InlineUnnestFunctionRule.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule is to inline unnest functions that are hold by variables.
+ * This rule is to fix issue 201.
+ */
+public class InlineUnnestFunctionRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (context.checkIfInDontApplySet(this, op1))
+            return false;
+        context.addToDontApplySet(this, op1);
+        if (op1.getOperatorTag() != LogicalOperatorTag.UNNEST)
+            return false;
+        UnnestOperator unnestOperator = (UnnestOperator) op1;
+        AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) unnestOperator.getExpressionRef()
+                .getValue();
+        //we only inline for the scan-collection function
+        if (expr.getFunctionIdentifier() != AsterixBuiltinFunctions.SCAN_COLLECTION)
+            return false;
+
+        // inline all variables from an unnesting function call
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+        List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
+        for (int i = 0; i < args.size(); i++) {
+            ILogicalExpression argExpr = args.get(i).getValue();
+            if (argExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression varExpr = (VariableReferenceExpression) argExpr;
+                inlineVariable(varExpr.getVariableReference(), unnestOperator);
+            }
+        }
+        return true;
+    }
+
+    /**
+     * This method is to inline one variable
+     * 
+     * @param usedVar
+     *            A variable that is used by the scan-collection function in the unnest operator
+     * @param unnestOp
+     *            The unnest operator.
+     * @throws AlgebricksException
+     */
+    private void inlineVariable(LogicalVariable usedVar, UnnestOperator unnestOp) throws AlgebricksException {
+        AbstractFunctionCallExpression expr = (AbstractFunctionCallExpression) unnestOp.getExpressionRef().getValue();
+        List<Pair<AbstractFunctionCallExpression, Integer>> parentAndIndexList = new ArrayList<Pair<AbstractFunctionCallExpression, Integer>>();
+        getParentFunctionExpression(usedVar, expr, parentAndIndexList);
+        ILogicalExpression usedVarOrginExpr = findUsedVarOrigin(usedVar, unnestOp, (AbstractLogicalOperator) unnestOp
+                .getInputs().get(0).getValue());
+        if (usedVarOrginExpr != null) {
+            for (Pair<AbstractFunctionCallExpression, Integer> parentAndIndex : parentAndIndexList) {
+                //we only rewrite the top scan-collection function
+                if (parentAndIndex.first.getFunctionIdentifier() == AsterixBuiltinFunctions.SCAN_COLLECTION
+                        && parentAndIndex.first == expr) {
+                    unnestOp.getExpressionRef().setValue(usedVarOrginExpr);
+                }
+            }
+        }
+    }
+
+    private void getParentFunctionExpression(LogicalVariable usedVar, ILogicalExpression expr,
+            List<Pair<AbstractFunctionCallExpression, Integer>> parentAndIndexList) {
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+        List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
+        for (int i = 0; i < args.size(); i++) {
+            ILogicalExpression argExpr = args.get(i).getValue();
+            if (argExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression varExpr = (VariableReferenceExpression) argExpr;
+                if (varExpr.getVariableReference().equals(usedVar))
+                    parentAndIndexList.add(new Pair<AbstractFunctionCallExpression, Integer>(funcExpr, i));
+            }
+            if (argExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                getParentFunctionExpression(usedVar, argExpr, parentAndIndexList);
+            }
+        }
+    }
+
+    private ILogicalExpression findUsedVarOrigin(LogicalVariable usedVar, AbstractLogicalOperator parentOp,
+            AbstractLogicalOperator currentOp) throws AlgebricksException {
+        ILogicalExpression ret = null;
+        if (currentOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+            List<LogicalVariable> producedVars = new ArrayList<LogicalVariable>();
+            VariableUtilities.getProducedVariables(currentOp, producedVars);
+            if (producedVars.contains(usedVar)) {
+                AssignOperator assignOp = (AssignOperator) currentOp;
+                int index = assignOp.getVariables().indexOf(usedVar);
+                ILogicalExpression returnedExpr = assignOp.getExpressions().get(index).getValue();
+                if (returnedExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                    AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) returnedExpr;
+                    if (AsterixBuiltinFunctions.isBuiltinUnnestingFunction(funcExpr.getFunctionIdentifier())) {
+                        // we only inline for unnest functions
+                        removeUnecessaryAssign(parentOp, currentOp, assignOp, index);
+                        ret = returnedExpr;
+                    }
+                } else if (returnedExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                    //recusively inline
+                    VariableReferenceExpression varExpr = (VariableReferenceExpression) returnedExpr;
+                    LogicalVariable var = varExpr.getVariableReference();
+                    ILogicalExpression finalExpr = findUsedVarOrigin(var, currentOp,
+                            (AbstractLogicalOperator) currentOp.getInputs().get(0).getValue());
+                    if (finalExpr != null) {
+                        removeUnecessaryAssign(parentOp, currentOp, assignOp, index);
+                        ret = finalExpr;
+                    }
+                }
+            }
+        } else {
+            for (Mutable<ILogicalOperator> child : currentOp.getInputs()) {
+                ILogicalExpression expr = findUsedVarOrigin(usedVar, currentOp,
+                        (AbstractLogicalOperator) child.getValue());
+                if (expr != null) {
+                    ret = expr;
+                }
+            }
+        }
+        return ret;
+    }
+
+    private void removeUnecessaryAssign(AbstractLogicalOperator parentOp, AbstractLogicalOperator currentOp,
+            AssignOperator assignOp, int index) {
+        assignOp.getVariables().remove(index);
+        assignOp.getExpressions().remove(index);
+        if (assignOp.getVariables().size() == 0) {
+            int opIndex = parentOp.getInputs().indexOf(new MutableObject<ILogicalOperator>(currentOp));
+            parentOp.getInputs().get(opIndex).setValue(assignOp.getInputs().get(0).getValue());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
new file mode 100644
index 0000000..b64c0e7
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import edu.uci.ics.asterix.metadata.declared.DatasetDataSource;
+import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class IntroduceAutogenerateIDRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+
+        // match: [insert to internal dataset with autogenerated id] - assign - project
+        // produce: insert - assign - assign* - project
+        // **
+        // OR [insert to internal dataset with autogenerated id] - assign - [datasource scan]
+        // produce insert - assign - assign* - datasource scan
+
+        AbstractLogicalOperator currentOp = (AbstractLogicalOperator) opRef.getValue();
+        if (currentOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE) {
+            return false;
+        }
+
+        InsertDeleteOperator insertOp = (InsertDeleteOperator) currentOp;
+        if (insertOp.getOperation() != Kind.INSERT) {
+            return false;
+        }
+
+        DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
+        boolean autogenerated = ((InternalDatasetDetails) dds.getDataset().getDatasetDetails()).isAutogenerated();
+        if (!autogenerated) {
+            return false;
+        }
+
+        if (((AqlDataSource) insertOp.getDataSource()).getDatasourceType() != AqlDataSourceType.INTERNAL_DATASET) {
+            return false;
+        }
+
+        AbstractLogicalOperator parentOp = (AbstractLogicalOperator) currentOp.getInputs().get(0).getValue();
+        if (parentOp.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            return false;
+        }
+        AssignOperator assignOp = (AssignOperator) parentOp;
+        LogicalVariable inputRecord;
+
+        //bug here. will not work for internal datasets with filters since the pattern becomes [project-assign-assign-insert] <-this should be fixed->
+        AbstractLogicalOperator grandparentOp = (AbstractLogicalOperator) parentOp.getInputs().get(0).getValue();
+        if (grandparentOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+            ProjectOperator projectOp = (ProjectOperator) grandparentOp;
+            inputRecord = projectOp.getVariables().get(0);
+        } else if (grandparentOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+            DataSourceScanOperator dssOp = (DataSourceScanOperator) grandparentOp;
+            inputRecord = dssOp.getVariables().get(0);
+        } else {
+            return false;
+        }
+
+        List<String> pkFieldName = ((InternalDatasetDetails) dds.getDataset().getDatasetDetails()).getPrimaryKey().get(
+                0);
+        ILogicalExpression rec0 = new VariableReferenceExpression(inputRecord);
+        ILogicalExpression rec1 = createPrimaryKeyRecordExpression(pkFieldName);
+        ILogicalExpression mergedRec = createRecordMergeFunction(rec0, rec1);
+        ILogicalExpression nonNullMergedRec = createNotNullFunction(mergedRec);
+
+        LogicalVariable v = context.newVar();
+        AssignOperator newAssign = new AssignOperator(v, new MutableObject<ILogicalExpression>(nonNullMergedRec));
+        newAssign.getInputs().add(new MutableObject<ILogicalOperator>(grandparentOp));
+        assignOp.getInputs().set(0, new MutableObject<ILogicalOperator>(newAssign));
+        VariableUtilities.substituteVariables(assignOp, inputRecord, v, context);
+        VariableUtilities.substituteVariables(insertOp, inputRecord, v, context);
+        context.computeAndSetTypeEnvironmentForOperator(newAssign);
+        context.computeAndSetTypeEnvironmentForOperator(assignOp);
+        context.computeAndSetTypeEnvironmentForOperator(insertOp);
+        return true;
+    }
+
+    private ILogicalExpression createNotNullFunction(ILogicalExpression mergedRec) {
+        List<Mutable<ILogicalExpression>> args = new ArrayList<>();
+        args.add(new MutableObject<ILogicalExpression>(mergedRec));
+        AbstractFunctionCallExpression notNullFn = new ScalarFunctionCallExpression(
+                FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.NOT_NULL), args);
+        return notNullFn;
+    }
+
+    private AbstractFunctionCallExpression createPrimaryKeyRecordExpression(List<String> pkFieldName) {
+        //Create lowest level of nested uuid
+        AbstractFunctionCallExpression uuidFn = new ScalarFunctionCallExpression(
+                FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.CREATE_UUID));
+        List<Mutable<ILogicalExpression>> openRecordConsArgs = new ArrayList<>();
+        Mutable<ILogicalExpression> pkFieldNameExpression = new MutableObject<ILogicalExpression>(
+                new ConstantExpression(new AsterixConstantValue(new AString(pkFieldName.get(pkFieldName.size() - 1)))));
+        openRecordConsArgs.add(pkFieldNameExpression);
+        Mutable<ILogicalExpression> pkFieldValueExpression = new MutableObject<ILogicalExpression>(uuidFn);
+        openRecordConsArgs.add(pkFieldValueExpression);
+        AbstractFunctionCallExpression openRecFn = new ScalarFunctionCallExpression(
+                FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR), openRecordConsArgs);
+
+        //Create higher levels
+        for (int i = pkFieldName.size() - 2; i > -1; i--) {
+            AString fieldName = new AString(pkFieldName.get(i));
+            openRecordConsArgs = new ArrayList<>();
+            openRecordConsArgs.add(new MutableObject<ILogicalExpression>(new ConstantExpression(
+                    new AsterixConstantValue(fieldName))));
+            openRecordConsArgs.add(new MutableObject<ILogicalExpression>(openRecFn));
+            openRecFn = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR), openRecordConsArgs);
+        }
+
+        return openRecFn;
+    }
+
+    private AbstractFunctionCallExpression createRecordMergeFunction(ILogicalExpression rec0, ILogicalExpression rec1) {
+        List<Mutable<ILogicalExpression>> recordMergeFnArgs = new ArrayList<>();
+        recordMergeFnArgs.add(new MutableObject<>(rec0));
+        recordMergeFnArgs.add(new MutableObject<>(rec1));
+        AbstractFunctionCallExpression recordMergeFn = new ScalarFunctionCallExpression(
+                FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.RECORD_MERGE), recordMergeFnArgs);
+        return recordMergeFn;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastForExternalFunctionRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastForExternalFunctionRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastForExternalFunctionRule.java
new file mode 100644
index 0000000..545a336
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastForExternalFunctionRule.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.metadata.functions.AsterixExternalScalarFunctionInfo;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule provides the same type-casting handling as the IntroduceDynamicTypeCastRule does.
+ * The only difference is that this rule is intended for external functions (User-Defined Functions).
+ * Refer to IntroduceDynamicTypeCastRule for the detail.
+ */
+public class IntroduceDynamicTypeCastForExternalFunctionRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        /**
+         * pattern match: distribute_result - project - assign (external function call) - assign (open_record_constructor)
+         * resulting plan: distribute_result - project - assign (external function call) - assign (cast-record) - assign(open_record_constructor)
+         */
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT)
+            return false;
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.PROJECT)
+            return false;
+        AbstractLogicalOperator op3 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
+        if (op3.getOperatorTag() != LogicalOperatorTag.ASSIGN)
+            return false;
+        AbstractLogicalOperator op4 = (AbstractLogicalOperator) op3.getInputs().get(0).getValue();
+        if (op4.getOperatorTag() != LogicalOperatorTag.ASSIGN)
+            return false;
+
+        // Op1 : assign (external function call), Op2 : assign (open_record_constructor)
+        AssignOperator assignOp1 = (AssignOperator) op3;
+        AssignOperator assignOp2 = (AssignOperator) op4;
+
+        // Checks whether open-record-constructor is called to create a record in the first assign operator - assignOp2
+        FunctionIdentifier fid = null;
+        ILogicalExpression assignExpr = assignOp2.getExpressions().get(0).getValue();
+        if (assignExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            ScalarFunctionCallExpression funcExpr = (ScalarFunctionCallExpression) assignOp2.getExpressions().get(0)
+                    .getValue();
+            fid = funcExpr.getFunctionIdentifier();
+
+            if (fid != AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR) {
+                return false;
+            }
+        } else {
+            return false;
+        }
+
+        // Checks whether an external function is called in the second assign operator - assignOp1
+        assignExpr = assignOp1.getExpressions().get(0).getValue();
+        ScalarFunctionCallExpression funcExpr = null;
+        if (assignExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            funcExpr = (ScalarFunctionCallExpression) assignOp1.getExpressions().get(0).getValue();
+            fid = funcExpr.getFunctionIdentifier();
+
+            // Checks whether this is an internal function call. Then, we return false.
+            if (AsterixBuiltinFunctions.getBuiltinFunctionIdentifier(fid) != null) {
+                return false;
+            }
+
+        } else {
+            return false;
+        }
+
+        AsterixExternalScalarFunctionInfo finfo = (AsterixExternalScalarFunctionInfo) funcExpr.getFunctionInfo();
+        ARecordType requiredRecordType = (ARecordType) finfo.getArgumenTypes().get(0);
+
+        List<LogicalVariable> recordVar = new ArrayList<LogicalVariable>();
+        recordVar.addAll(assignOp2.getVariables());
+
+        IVariableTypeEnvironment env = assignOp2.computeOutputTypeEnvironment(context);
+        IAType inputRecordType = (IAType) env.getVarType(recordVar.get(0));
+
+        /** the input record type can be an union type -- for the case when it comes from a subplan or left-outer join */
+        boolean checkNull = false;
+        while (NonTaggedFormatUtil.isOptional(inputRecordType)) {
+            /** while-loop for the case there is a nested multi-level union */
+            inputRecordType = ((AUnionType) inputRecordType).getNullableType();
+            checkNull = true;
+        }
+
+        /** see whether the input record type needs to be casted */
+        boolean cast = !IntroduceDynamicTypeCastRule.compatible(requiredRecordType, inputRecordType);
+
+        if (checkNull) {
+            recordVar.set(0, IntroduceDynamicTypeCastRule.addWrapperFunction(requiredRecordType, recordVar.get(0),
+                    assignOp1, context, AsterixBuiltinFunctions.NOT_NULL));
+        }
+        if (cast) {
+            IntroduceDynamicTypeCastRule.addWrapperFunction(requiredRecordType, recordVar.get(0), assignOp1, context,
+                    AsterixBuiltinFunctions.CAST_RECORD);
+        }
+        return cast || checkNull;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
new file mode 100644
index 0000000..64c34ed
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
@@ -0,0 +1,288 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Dynamically cast a variable from its type to a specified required type, in a
+ * recursive way. It enables: 1. bag-based fields in a record, 2. bidirectional
+ * cast of a open field and a matched closed field, and 3. put in null fields
+ * when necessary.
+ * Here is an example: A record { "hobby": {{"music", "coding"}}, "id": "001",
+ * "name": "Person Three"} which conforms to closed type ( id: string, name:
+ * string, hobby: {{string}}? ) can be cast to an open type (id: string ), or
+ * vice versa.
+ * However, if the input record is a variable, then we don't know its exact
+ * field layout at compile time. For example, records conforming to the same
+ * type can have different field orderings and different open parts. That's why
+ * we need dynamic type casting.
+ * Note that as we can see in the example, the ordering of fields of a record is
+ * not required. Since the open/closed part of a record has completely different
+ * underlying memory/storage layout, a cast-record function will change the
+ * layout as specified at runtime.
+ * Implementation wise, this rule checks the target dataset type and the input
+ * record type, and if the types are different, then it plugs in an assign with
+ * a cast-record function, and projects away the original (uncast) field.
+ */
+public class IntroduceDynamicTypeCastRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        // Depending on the operator type, we need to extract the following pieces of information.
+        AbstractLogicalOperator op;
+        ARecordType requiredRecordType;
+        LogicalVariable recordVar;
+
+        // We identify INSERT and DISTRIBUTE_RESULT operators.
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        switch (op1.getOperatorTag()) {
+            case SINK: {
+                /**
+                 * pattern match: sink insert assign
+                 * resulting plan: sink-insert-project-assign
+                 */
+
+                AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+                if (op2.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE) {
+                    InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op2;
+                    if (insertDeleteOp.getOperation() == InsertDeleteOperator.Kind.DELETE)
+                        return false;
+
+                    // Remember this is the operator we need to modify
+                    op = insertDeleteOp;
+
+                    // Derive the required ARecordType based on the schema of the AqlDataSource
+                    InsertDeleteOperator insertDeleteOperator = (InsertDeleteOperator) op2;
+                    AqlDataSource dataSource = (AqlDataSource) insertDeleteOperator.getDataSource();
+                    IAType[] schemaTypes = (IAType[]) dataSource.getSchemaTypes();
+                    requiredRecordType = (ARecordType) schemaTypes[schemaTypes.length - 1];
+
+                    // Derive the Variable which we will potentially wrap with cast/null functions
+                    ILogicalExpression expr = insertDeleteOperator.getPayloadExpression().getValue();
+                    List<LogicalVariable> payloadVars = new ArrayList<LogicalVariable>();
+                    expr.getUsedVariables(payloadVars);
+                    recordVar = payloadVars.get(0);
+                } else {
+                    return false;
+                }
+
+                break;
+            }
+            case DISTRIBUTE_RESULT: {
+                // First, see if there was an output-record-type specified
+                requiredRecordType = (ARecordType) op1.getAnnotations().get("output-record-type");
+                if (requiredRecordType == null) {
+                    return false;
+                }
+
+                // Remember this is the operator we need to modify
+                op = op1;
+
+                // The Variable we want is the (hopefully singular, hopefully record-typed) live variable
+                // of the singular input operator of the DISTRIBUTE_RESULT
+                if (op.getInputs().size() > 1) {
+                    // Hopefully not possible?
+                    throw new AlgebricksException(
+                            "output-record-type defined for expression with multiple input operators");
+                }
+                AbstractLogicalOperator input = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+                List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
+                VariableUtilities.getLiveVariables(input, liveVars);
+                if (liveVars.size() > 1) {
+                    throw new AlgebricksException(
+                            "Expression with multiple fields cannot be cast to output-record-type!");
+                }
+                recordVar = liveVars.get(0);
+
+                break;
+            }
+            default: {
+                return false;
+            }
+        }
+
+        // Derive the statically-computed type of the record
+        IVariableTypeEnvironment env = op.computeOutputTypeEnvironment(context);
+        IAType inputRecordType = (IAType) env.getVarType(recordVar);
+
+        /** the input record type can be an union type -- for the case when it comes from a subplan or left-outer join */
+        boolean checkNull = false;
+        while (NonTaggedFormatUtil.isOptional(inputRecordType)) {
+            /** while-loop for the case there is a nested multi-level union */
+            inputRecordType = ((AUnionType) inputRecordType).getNullableType();
+            checkNull = true;
+        }
+
+        /** see whether the input record type needs to be casted */
+        boolean cast = !compatible(requiredRecordType, inputRecordType);
+
+        if (checkNull) {
+            recordVar = addWrapperFunction(requiredRecordType, recordVar, op, context, AsterixBuiltinFunctions.NOT_NULL);
+        }
+        if (cast) {
+            addWrapperFunction(requiredRecordType, recordVar, op, context, AsterixBuiltinFunctions.CAST_RECORD);
+        }
+        return cast || checkNull;
+    }
+
+    /**
+     * Inject a function to wrap a variable when necessary
+     *
+     * @param requiredRecordType
+     *            the required record type
+     * @param recordVar
+     *            the record variable
+     * @param parent
+     *            the current parent operator to be rewritten
+     * @param context
+     *            the optimization context
+     * @param fd
+     *            the function to be injected
+     * @return true if cast is injected; false otherwise.
+     * @throws AlgebricksException
+     */
+    public static LogicalVariable addWrapperFunction(ARecordType requiredRecordType, LogicalVariable recordVar,
+            ILogicalOperator parent, IOptimizationContext context, FunctionIdentifier fd) throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> opRefs = parent.getInputs();
+        for (int index = 0; index < opRefs.size(); index++) {
+            Mutable<ILogicalOperator> opRef = opRefs.get(index);
+            ILogicalOperator op = opRef.getValue();
+
+            /** get produced vars */
+            List<LogicalVariable> producedVars = new ArrayList<LogicalVariable>();
+            VariableUtilities.getProducedVariables(op, producedVars);
+            IVariableTypeEnvironment env = op.computeOutputTypeEnvironment(context);
+            for (int i = 0; i < producedVars.size(); i++) {
+                LogicalVariable var = producedVars.get(i);
+                if (var.equals(recordVar)) {
+                    /** insert an assign operator to call the function on-top-of the variable */
+                    IAType actualType = (IAType) env.getVarType(var);
+                    AbstractFunctionCallExpression cast = new ScalarFunctionCallExpression(
+                            FunctionUtils.getFunctionInfo(fd));
+                    cast.getArguments()
+                            .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
+                    /** enforce the required record type */
+                    TypeComputerUtilities.setRequiredAndInputTypes(cast, requiredRecordType, actualType);
+                    LogicalVariable newAssignVar = context.newVar();
+                    AssignOperator newAssignOperator = new AssignOperator(newAssignVar,
+                            new MutableObject<ILogicalExpression>(cast));
+                    newAssignOperator.getInputs().add(new MutableObject<ILogicalOperator>(op));
+                    opRef.setValue(newAssignOperator);
+                    context.computeAndSetTypeEnvironmentForOperator(newAssignOperator);
+                    newAssignOperator.computeOutputTypeEnvironment(context);
+                    VariableUtilities.substituteVariables(parent, recordVar, newAssignVar, context);
+                    return newAssignVar;
+                }
+            }
+            /** recursive descend to the operator who produced the recordVar */
+            LogicalVariable replacedVar = addWrapperFunction(requiredRecordType, recordVar, op, context, fd);
+            if (replacedVar != null) {
+                /** substitute the recordVar by the replacedVar for operators who uses recordVar */
+                VariableUtilities.substituteVariables(parent, recordVar, replacedVar, context);
+                return replacedVar;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Check whether the required record type and the input type is compatible
+     *
+     * @param reqType
+     * @param inputType
+     * @return true if compatible; false otherwise
+     * @throws AlgebricksException
+     */
+    public static boolean compatible(ARecordType reqType, IAType inputType) throws AlgebricksException {
+        if (inputType.getTypeTag() == ATypeTag.ANY) {
+            return false;
+        }
+        if (inputType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("The input type " + inputType + " is not a valid record type!");
+        }
+        ARecordType inputRecType = (ARecordType) inputType;
+        if (reqType.isOpen() != inputRecType.isOpen()) {
+            return false;
+        }
+
+        IAType[] reqTypes = reqType.getFieldTypes();
+        String[] reqFieldNames = reqType.getFieldNames();
+        IAType[] inputTypes = inputRecType.getFieldTypes();
+        String[] inputFieldNames = ((ARecordType) inputType).getFieldNames();
+
+        if (reqTypes.length != inputTypes.length) {
+            return false;
+        }
+        for (int i = 0; i < reqTypes.length; i++) {
+            if (!reqFieldNames[i].equals(inputFieldNames[i])) {
+                return false;
+            }
+            IAType reqTypeInside = reqTypes[i];
+            if (NonTaggedFormatUtil.isOptional(reqTypes[i])) {
+                reqTypeInside = ((AUnionType) reqTypes[i]).getNullableType();
+            }
+            IAType inputTypeInside = inputTypes[i];
+            if (NonTaggedFormatUtil.isOptional(inputTypes[i])) {
+                if (!NonTaggedFormatUtil.isOptional(reqTypes[i])) {
+                    /** if the required type is not optional, the two types are incompatible */
+                    return false;
+                }
+                inputTypeInside = ((AUnionType) inputTypes[i]).getNullableType();
+            }
+            if (inputTypeInside.getTypeTag() != ATypeTag.NULL && !reqTypeInside.equals(inputTypeInside)) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceEnforcedListTypeRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceEnforcedListTypeRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceEnforcedListTypeRule.java
new file mode 100644
index 0000000..543a132
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceEnforcedListTypeRule.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.optimizer.rules.typecast.StaticTypeCastUtil;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractAssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This class is to enforce types for function expressions which contain list constructor function calls.
+ * The List constructor is very special because a nested list is of type List<ANY>.
+ * However, the bottom-up type inference (InferTypeRule in algebricks) did not infer that so we need this method to enforce the type.
+ * We do not want to break the generality of algebricks so this method is called in an ASTERIX rule: @ IntroduceEnforcedListTypeRule} .
+ */
+public class IntroduceEnforcedListTypeRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        if (context.checkIfInDontApplySet(this, opRef.getValue()))
+            return false;
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        context.addToDontApplySet(this, opRef.getValue());
+
+        /**
+         * rewrite list constructor types for list constructor functions
+         */
+        List<Mutable<ILogicalExpression>> expressions;
+        switch (op.getOperatorTag()) {
+            case ASSIGN:
+                AbstractAssignOperator assignOp = (AbstractAssignOperator) op;
+                expressions = assignOp.getExpressions();
+                break;
+            case UNNEST:
+                AbstractUnnestOperator unnestOp = (AbstractUnnestOperator) op;
+                expressions = Collections.singletonList(unnestOp.getExpressionRef());
+                break;
+            default:
+                return false;
+        }
+        IVariableTypeEnvironment env = op.computeOutputTypeEnvironment(context);
+        return rewriteExpressions(expressions, env);
+    }
+
+    private boolean rewriteExpressions(List<Mutable<ILogicalExpression>> expressions, IVariableTypeEnvironment env)
+            throws AlgebricksException {
+        boolean changed = false;
+        for (Mutable<ILogicalExpression> exprRef : expressions) {
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                AbstractFunctionCallExpression argFuncExpr = (AbstractFunctionCallExpression) expr;
+                IAType exprType = (IAType) env.getType(argFuncExpr);
+                if (StaticTypeCastUtil.rewriteListExpr(argFuncExpr, exprType, exprType, env)) {
+                    TypeComputerUtilities.resetRequiredAndInputTypes(argFuncExpr);
+                    changed = true;
+                }
+            }
+        }
+        return changed;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
new file mode 100644
index 0000000..782f7ed
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.algebra.operators.CommitOperator;
+import edu.uci.ics.asterix.algebra.operators.physical.BTreeSearchPOperator;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataImplConfig;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.optimizer.rules.am.AccessMethodJobGenParams;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class IntroduceInstantLockSearchCallbackRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    private void extractDataSourcesInfo(AbstractLogicalOperator op,
+            Map<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> dataSourcesMap) {
+
+        for (int i = 0; i < op.getInputs().size(); ++i) {
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
+                UnnestMapOperator unnestMapOp = (UnnestMapOperator) descendantOp;
+                ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
+                if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                    AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+                    FunctionIdentifier fid = f.getFunctionIdentifier();
+                    if (!fid.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+                        throw new IllegalStateException();
+                    }
+                    AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+                    jobGenParams.readFromFuncArgs(f.getArguments());
+                    boolean isPrimaryIndex = jobGenParams.isPrimaryIndex();
+                    String indexName = jobGenParams.getIndexName();
+                    if (isPrimaryIndex) {
+                        if (dataSourcesMap.containsKey(indexName)) {
+                            ++(dataSourcesMap.get(indexName).first);
+                        } else {
+                            dataSourcesMap.put(indexName, new Triple<Integer, LogicalOperatorTag, IPhysicalOperator>(1,
+                                    LogicalOperatorTag.UNNEST_MAP, unnestMapOp.getPhysicalOperator()));
+                        }
+                    }
+                }
+            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+                DataSourceScanOperator dataSourceScanOp = (DataSourceScanOperator) descendantOp;
+                String datasourceName = ((AqlDataSource) dataSourceScanOp.getDataSource()).getDatasourceName();
+                if (dataSourcesMap.containsKey(datasourceName)) {
+                    ++(dataSourcesMap.get(datasourceName).first);
+                } else {
+                    dataSourcesMap.put(datasourceName, new Triple<Integer, LogicalOperatorTag, IPhysicalOperator>(1,
+                            LogicalOperatorTag.DATASOURCESCAN, dataSourceScanOp.getPhysicalOperator()));
+                }
+            }
+            extractDataSourcesInfo(descendantOp, dataSourcesMap);
+        }
+
+    }
+
+    private boolean checkIfRuleIsApplicable(AbstractLogicalOperator op) {
+        if (op.getPhysicalOperator() == null) {
+            return false;
+        }
+        if (op.getOperatorTag() == LogicalOperatorTag.EXTENSION_OPERATOR) {
+            ExtensionOperator extensionOp = (ExtensionOperator) op;
+            if (extensionOp.getDelegate() instanceof CommitOperator) {
+                return true;
+            }
+        }
+        if (op.getOperatorTag() == LogicalOperatorTag.DISTRIBUTE_RESULT
+                || op.getOperatorTag() == LogicalOperatorTag.WRITE_RESULT) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+
+        if (!checkIfRuleIsApplicable(op)) {
+            return false;
+        }
+        Map<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> dataSourcesMap = new HashMap<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>>();
+        extractDataSourcesInfo(op, dataSourcesMap);
+
+        boolean introducedInstantLock = false;
+
+        Iterator<Map.Entry<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>>> it = dataSourcesMap
+                .entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> entry = it.next();
+            Triple<Integer, LogicalOperatorTag, IPhysicalOperator> triple = entry.getValue();
+            if (triple.first == 1) {
+                AqlMetadataImplConfig aqlMetadataImplConfig = new AqlMetadataImplConfig(true);
+                if (triple.second == LogicalOperatorTag.UNNEST_MAP) {
+                    BTreeSearchPOperator pOperator = (BTreeSearchPOperator) triple.third;
+                    pOperator.setImplConfig(aqlMetadataImplConfig);
+                    introducedInstantLock = true;
+                } else {
+                    DataSourceScanPOperator pOperator = (DataSourceScanPOperator) triple.third;
+                    pOperator.setImplConfig(aqlMetadataImplConfig);
+                    introducedInstantLock = true;
+                }
+            }
+
+        }
+        return introducedInstantLock;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
new file mode 100644
index 0000000..acd3b13
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import edu.uci.ics.asterix.metadata.declared.DatasetDataSource;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.optimizer.rules.am.AccessMethodJobGenParams;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.MaterializePOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class IntroduceMaterializationForInsertWithSelfScanRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE) {
+            return false;
+        }
+
+        InsertDeleteOperator insertOp = (InsertDeleteOperator) op;
+        boolean sameDataset = checkIfInsertAndScanDatasetsSame(op, ((DatasetDataSource) insertOp.getDataSource())
+                .getDataset().getDatasetName());
+
+        if (sameDataset) {
+            MaterializeOperator materializeOperator = new MaterializeOperator();
+            MaterializePOperator materializePOperator = new MaterializePOperator(true);
+            materializeOperator.setPhysicalOperator(materializePOperator);
+
+            materializeOperator.getInputs().add(
+                    new MutableObject<ILogicalOperator>(insertOp.getInputs().get(0).getValue()));
+            context.computeAndSetTypeEnvironmentForOperator(materializeOperator);
+
+            insertOp.getInputs().clear();
+            insertOp.getInputs().add(new MutableObject<ILogicalOperator>(materializeOperator));
+            context.computeAndSetTypeEnvironmentForOperator(insertOp);
+            return true;
+        } else {
+            return false;
+        }
+
+    }
+
+    private boolean checkIfInsertAndScanDatasetsSame(AbstractLogicalOperator op, String insertDatasetName) {
+        boolean sameDataset = false;
+        for (int i = 0; i < op.getInputs().size(); ++i) {
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
+                UnnestMapOperator unnestMapOp = (UnnestMapOperator) descendantOp;
+                ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
+                if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                    AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
+                    FunctionIdentifier fid = f.getFunctionIdentifier();
+                    if (!fid.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+                        throw new IllegalStateException();
+                    }
+                    AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
+                    jobGenParams.readFromFuncArgs(f.getArguments());
+                    boolean isPrimaryIndex = jobGenParams.isPrimaryIndex();
+                    String indexName = jobGenParams.getIndexName();
+                    if (isPrimaryIndex && indexName.compareTo(insertDatasetName) == 0) {
+                        return true;
+                    }
+                }
+            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+                DataSourceScanOperator dataSourceScanOp = (DataSourceScanOperator) descendantOp;
+                AqlDataSource ds = (AqlDataSource) dataSourceScanOp.getDataSource();
+                if (ds.getDatasourceType() != AqlDataSourceType.FEED
+                        && ds.getDatasourceType() != AqlDataSourceType.LOADABLE) {
+                    if (((DatasetDataSource) ds).getDataset().getDatasetName().compareTo(insertDatasetName) == 0) {
+                        return true;
+                    }
+                }
+            }
+            sameDataset = checkIfInsertAndScanDatasetsSame(descendantOp, insertDatasetName);
+            if (sameDataset) {
+                break;
+            }
+        }
+        return sameDataset;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
new file mode 100644
index 0000000..f4349c3
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import edu.uci.ics.asterix.metadata.declared.FeedDataSource;
+import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class IntroduceRandomPartitioningFeedComputationRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        if (!op.getOperatorTag().equals(LogicalOperatorTag.ASSIGN)) {
+            return false;
+        }
+
+        ILogicalOperator opChild = op.getInputs().get(0).getValue();
+        if (!opChild.getOperatorTag().equals(LogicalOperatorTag.DATASOURCESCAN)) {
+            return false;
+        }
+
+        DataSourceScanOperator scanOp = (DataSourceScanOperator) opChild;
+        AqlDataSource dataSource = (AqlDataSource) scanOp.getDataSource();
+        if (!dataSource.getDatasourceType().equals(AqlDataSourceType.FEED)) {
+            return false;
+        }
+
+        final FeedDataSource feedDataSource = (FeedDataSource) dataSource;
+        Feed feed = feedDataSource.getFeed();
+        if (feed.getAppliedFunction() == null) {
+            return false;
+        }
+
+        ExchangeOperator exchangeOp = new ExchangeOperator();
+        INodeDomain domain = new INodeDomain() {
+            @Override
+            public boolean sameAs(INodeDomain domain) {
+                return domain == this;
+            }
+
+            @Override
+            public Integer cardinality() {
+                return feedDataSource.getComputeCardinality();
+            }
+        };
+
+        exchangeOp.setPhysicalOperator(new RandomPartitionPOperator(domain));
+        op.getInputs().get(0).setValue(exchangeOp);
+        exchangeOp.getInputs().add(new MutableObject<ILogicalOperator>(scanOp));
+        ExecutionMode em = ((AbstractLogicalOperator) scanOp).getExecutionMode();
+        exchangeOp.setExecutionMode(em);
+        exchangeOp.computeDeliveredPhysicalProperties(context);
+        context.computeAndSetTypeEnvironmentForOperator(exchangeOp);
+
+        AssignOperator assignOp = (AssignOperator) opRef.getValue();
+        AssignPOperator assignPhyOp = (AssignPOperator) assignOp.getPhysicalOperator();
+        assignPhyOp.setCardinalityConstraint(domain.cardinality());
+
+        return true;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
new file mode 100644
index 0000000..d44d762
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.algebra.operators.CommitOperator;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule will search for project and assign operators in an insert/delete/update plan and
+ * pass a hint to all of them. This hint is used by the project and assign operators so that frames are pushed to
+ * the next operator without waiting until they get full. The purpose of this is to
+ * reduce the time of holding exclusive locks on the keys that have been inserted. Also to allow feeds batching
+ * to work correctly.
+ *
+ * @author salsubaiee
+ */
+public class IntroduceRapidFrameFlushProjectAssignRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    private boolean checkIfRuleIsApplicable(AbstractLogicalOperator op) {
+        if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+            return false;
+        }
+        ExtensionOperator extensionOp = (ExtensionOperator) op;
+        if (!(extensionOp.getDelegate() instanceof CommitOperator)) {
+            return false;
+        }
+
+        for (int i = 0; i < op.getInputs().size(); ++i) {
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.PROJECT
+                    || descendantOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+                if (descendantOp.getPhysicalOperator() == null) {
+                    return false;
+                }
+            }
+            checkIfRuleIsApplicable(descendantOp);
+        }
+        return true;
+    }
+
+    private boolean changeRule(AbstractLogicalOperator op) {
+        boolean planModified = false;
+        for (int i = 0; i < op.getInputs().size(); ++i) {
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+                ProjectOperator projectOp = (ProjectOperator) descendantOp;
+                StreamProjectPOperator physicalOp = (StreamProjectPOperator) projectOp.getPhysicalOperator();
+                physicalOp.setRapidFrameFlush(true);
+                planModified = true;
+            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+                AssignOperator assignOp = (AssignOperator) descendantOp;
+                AssignPOperator physicalOp = (AssignPOperator) assignOp.getPhysicalOperator();
+                physicalOp.setRapidFrameFlush(true);
+                planModified = true;
+            }
+            changeRule(descendantOp);
+        }
+        return planModified;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (!checkIfRuleIsApplicable(op)) {
+            return false;
+        }
+        return changeRule(op);
+    }
+}
\ No newline at end of file