You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:44:23 UTC

[35/51] [partial] incubator-asterixdb git commit: Change folder structure for Java repackage

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
new file mode 100644
index 0000000..5a4b52a
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.optimizer.rules.am.BTreeJobGenParams;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+
+/**
+ * Contributes the runtime operator for an unnest-map representing a BTree search.
+ */
+public class BTreeSearchPOperator extends IndexSearchPOperator {
+
+    private final List<LogicalVariable> lowKeyVarList;
+    private final List<LogicalVariable> highKeyVarList;
+    private final boolean isPrimaryIndex;
+    private final boolean isEqCondition;
+    private Object implConfig;
+
+    public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
+            boolean isPrimaryIndex, boolean isEqCondition, List<LogicalVariable> lowKeyVarList,
+            List<LogicalVariable> highKeyVarList) {
+        super(idx, requiresBroadcast);
+        this.isPrimaryIndex = isPrimaryIndex;
+        this.isEqCondition = isEqCondition;
+        this.lowKeyVarList = lowKeyVarList;
+        this.highKeyVarList = highKeyVarList;
+    }
+
+    public void setImplConfig(Object implConfig) {
+        this.implConfig = implConfig;
+    }
+
+    public Object getImplConfig() {
+        return implConfig;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.BTREE_SEARCH;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        UnnestMapOperator unnestMap = (UnnestMapOperator) op;
+        ILogicalExpression unnestExpr = unnestMap.getExpressionRef().getValue();
+        if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            throw new IllegalStateException();
+        }
+        AbstractFunctionCallExpression unnestFuncExpr = (AbstractFunctionCallExpression) unnestExpr;
+        FunctionIdentifier funcIdent = unnestFuncExpr.getFunctionIdentifier();
+        if (!funcIdent.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+            return;
+        }
+        BTreeJobGenParams jobGenParams = new BTreeJobGenParams();
+        jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
+        int[] lowKeyIndexes = getKeyIndexes(jobGenParams.getLowKeyVarList(), inputSchemas);
+        int[] highKeyIndexes = getKeyIndexes(jobGenParams.getHighKeyVarList(), inputSchemas);
+
+        int[] minFilterFieldIndexes = getKeyIndexes(unnestMap.getMinFilterVars(), inputSchemas);
+        int[] maxFilterFieldIndexes = getKeyIndexes(unnestMap.getMaxFilterVars(), inputSchemas);
+
+        AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+        Dataset dataset = metadataProvider.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+        List<LogicalVariable> outputVars = unnestMap.getVariables();
+        if (jobGenParams.getRetainInput()) {
+            outputVars = new ArrayList<LogicalVariable>();
+            VariableUtilities.getLiveVariables(unnestMap, outputVars);
+        }
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = metadataProvider.buildBtreeRuntime(
+                builder.getJobSpec(), outputVars, opSchema, typeEnv, context, jobGenParams.getRetainInput(),
+                jobGenParams.getRetainNull(), dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes,
+                jobGenParams.isLowKeyInclusive(), jobGenParams.isHighKeyInclusive(), implConfig, minFilterFieldIndexes,
+                maxFilterFieldIndexes);
+
+        builder.contributeHyracksOperator(unnestMap, btreeSearch.first);
+        builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, btreeSearch.second);
+
+        ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        if (requiresBroadcast) {
+            // For primary indexes optimizing an equality condition we can reduce the broadcast requirement to hash partitioning.
+            if (isPrimaryIndex && isEqCondition) {
+                StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+                ListSet<LogicalVariable> searchKeyVars = new ListSet<LogicalVariable>();
+                searchKeyVars.addAll(lowKeyVarList);
+                searchKeyVars.addAll(highKeyVarList);
+                // Also, add a local sorting property to enforce a sort before the primary-index operator.
+                List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+                List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+                for (LogicalVariable orderVar : searchKeyVars) {
+                    orderColumns.add(new OrderColumn(orderVar, OrderKind.ASC));
+                }
+                propsLocal.add(new LocalOrderProperty(orderColumns));
+                pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(searchKeyVars, null),
+                        propsLocal);
+                return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+            } else {
+                StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+                pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+                return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+            }
+        } else {
+            return super.getRequiredPropertiesForChildren(op, reqdByParent);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
new file mode 100644
index 0000000..6722cbe
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class CommitPOperator extends AbstractPhysicalOperator {
+
+    private final List<LogicalVariable> primaryKeyLogicalVars;
+    private final JobId jobId;
+    private final int datasetId;
+
+    public CommitPOperator(JobId jobId, int datasetId, List<LogicalVariable> primaryKeyLogicalVars) {
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+        this.primaryKeyLogicalVars = primaryKeyLogicalVars;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.EXTENSION_OPERATOR;
+    }
+
+    @Override
+    public String toString() {
+        return "COMMIT";
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
+        int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
+
+        AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+        CommitRuntimeFactory runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
+                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction());
+        builder.contributeMicroOperator(op, runtime, recDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
new file mode 100644
index 0000000..69c8f78
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
+
+public class CommitRuntime implements IPushRuntime {
+
+    private final static long SEED = 0L;
+
+    private final IHyracksTaskContext hyracksTaskCtx;
+    private final ITransactionManager transactionManager;
+    private final ILogManager logMgr;
+    private final JobId jobId;
+    private final int datasetId;
+    private final int[] primaryKeyFields;
+    private final boolean isTemporaryDatasetWriteJob;
+    private final boolean isWriteTransaction;
+    private final long[] longHashes;
+    private final LogRecord logRecord;
+
+    private ITransactionContext transactionContext;
+    private FrameTupleAccessor frameTupleAccessor;
+    private final FrameTupleReference frameTupleReference;
+
+    public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction) {
+        this.hyracksTaskCtx = ctx;
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
+        this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
+        this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+        this.primaryKeyFields = primaryKeyFields;
+        this.frameTupleReference = new FrameTupleReference();
+        this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
+        this.isWriteTransaction = isWriteTransaction;
+        this.longHashes = new long[2];
+        this.logRecord = new LogRecord();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        try {
+            transactionContext = transactionManager.getTransactionContext(jobId, false);
+            transactionContext.setWriteTxn(isWriteTransaction);
+        } catch (ACIDException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        int pkHash = 0;
+        frameTupleAccessor.reset(buffer);
+        int nTuple = frameTupleAccessor.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            if (isTemporaryDatasetWriteJob) {
+                /**
+                 * This "if branch" is for writes over temporary datasets.
+                 * A temporary dataset does not require any lock and does not generate any write-ahead
+                 * update and commit log but generates flush log and job commit log.
+                 * However, a temporary dataset still MUST guarantee no-steal policy so that this
+                 * notification call should be delivered to PrimaryIndexOptracker and used correctly in order
+                 * to decrement number of active operation count of PrimaryIndexOptracker.
+                 * By maintaining the count correctly and only allowing flushing when the count is 0, it can
+                 * guarantee the no-steal policy for temporary datasets, too.
+                 */
+                transactionContext.notifyOptracker(false);
+            } else {
+                frameTupleReference.reset(frameTupleAccessor, t);
+                pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
+                logRecord.formEntityCommitLogRecord(transactionContext, datasetId, pkHash, frameTupleReference,
+                        primaryKeyFields);
+                try {
+                    logMgr.log(logRecord);
+                } catch (ACIDException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+    }
+
+    private int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {
+        MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
+        return Math.abs((int) longHashes[0]);
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+
+    @Override
+    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        throw new IllegalStateException();
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        this.frameTupleAccessor = new FrameTupleAccessor(recordDescriptor);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
new file mode 100644
index 0000000..768cdb6
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class CommitRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final JobId jobId;
+    private final int datasetId;
+    private final int[] primaryKeyFields;
+    private final boolean isTemporaryDatasetWriteJob;
+    private final boolean isWriteTransaction;
+
+    public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
+            boolean isWriteTransaction) {
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+        this.primaryKeyFields = primaryKeyFields;
+        this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
+        this.isWriteTransaction = isWriteTransaction;
+    }
+
+    @Override
+    public String toString() {
+        return "commit";
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+        return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
+                isWriteTransaction);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
new file mode 100644
index 0000000..dcbc70c
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.external.indexing.dataflow.HDFSLookupAdapterFactory;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.declared.DatasetDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractScanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+
+public class ExternalDataLookupPOperator extends AbstractScanPOperator {
+
+    private final List<LogicalVariable> ridVarList;
+    private AqlSourceId datasetId;
+    private Dataset dataset;
+    private ARecordType recordType;
+    private Index secondaryIndex;
+    private boolean requiresBroadcast;
+    private boolean retainInput;
+    private boolean retainNull;
+
+    public ExternalDataLookupPOperator(AqlSourceId datasetId, Dataset dataset, ARecordType recordType,
+            Index secondaryIndex, List<LogicalVariable> ridVarList, boolean requiresBroadcast, boolean retainInput, boolean retainNull) {
+        this.datasetId = datasetId;
+        this.dataset = dataset;
+        this.recordType = recordType;
+        this.secondaryIndex = secondaryIndex;
+        this.ridVarList = ridVarList;
+        this.requiresBroadcast = requiresBroadcast;
+        this.retainInput = retainInput;
+        this.retainNull = retainNull;
+    }
+
+    public Dataset getDataset() {
+        return dataset;
+    }
+
+    public void setDataset(Dataset dataset) {
+        this.dataset = dataset;
+    }
+
+    public ARecordType getRecordType() {
+        return recordType;
+    }
+
+    public void setRecordType(ARecordType recordType) {
+        this.recordType = recordType;
+    }
+
+    public AqlSourceId getDatasetId() {
+        return datasetId;
+    }
+
+    public void setDatasetId(AqlSourceId datasetId) {
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.EXTERNAL_LOOKUP;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AqlDataSource ds = new DatasetDataSource(datasetId, datasetId.getDataverseName(), datasetId.getDatasourceName(),
+                recordType, AqlDataSourceType.EXTERNAL_DATASET);
+        IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
+        AbstractScanOperator as = (AbstractScanOperator) op;
+        deliveredProperties = dspp.computePropertiesVector(as.getVariables());
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        ExternalDataLookupOperator edabro = (ExternalDataLookupOperator) op;
+        ILogicalExpression expr = edabro.getExpressionRef().getValue();
+        if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            throw new IllegalStateException();
+        }
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+        FunctionIdentifier funcIdent = funcExpr.getFunctionIdentifier();
+        if (!funcIdent.equals(AsterixBuiltinFunctions.EXTERNAL_LOOKUP)) {
+            return;
+        }
+        int[] ridIndexes = getKeyIndexes(ridVarList, inputSchemas);
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+        List<LogicalVariable> outputVars = new ArrayList<LogicalVariable>();
+        if (retainInput) {
+            VariableUtilities.getLiveVariables(edabro, outputVars);
+        } else {
+            VariableUtilities.getProducedVariables(edabro, outputVars);
+        }
+
+        AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> externalLoopup = HDFSLookupAdapterFactory
+                .buildExternalDataLookupRuntime(builder.getJobSpec(), dataset, secondaryIndex, ridIndexes, retainInput,
+                        typeEnv, outputVars, opSchema, context, metadataProvider, retainNull);
+        builder.contributeHyracksOperator(edabro, externalLoopup.first);
+        builder.contributeAlgebricksPartitionConstraint(externalLoopup.first, externalLoopup.second);
+        ILogicalOperator srcExchange = edabro.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(srcExchange, 0, edabro, 0);
+    }
+
+    protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, IOperatorSchema[] inputSchemas) {
+        if (keyVarList == null) {
+            return null;
+        }
+        int[] keyIndexes = new int[keyVarList.size()];
+        for (int i = 0; i < keyVarList.size(); i++) {
+            keyIndexes[i] = inputSchemas[0].findVariable(keyVarList.get(i));
+        }
+        return keyIndexes;
+    }
+
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        if (requiresBroadcast) {
+            StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+            pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+            return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+
+        } else {
+            return super.getRequiredPropertiesForChildren(op, reqdByParent);
+        }
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
new file mode 100644
index 0000000..1793407
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractScanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+
+/**
+ * Class that embodies the commonalities between access method physical operators.
+ */
+public abstract class IndexSearchPOperator extends AbstractScanPOperator {
+
+    protected final IDataSourceIndex<String, AqlSourceId> idx;
+    protected final boolean requiresBroadcast;
+
+    public IndexSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
+        this.idx = idx;
+        this.requiresBroadcast = requiresBroadcast;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        IDataSource<?> ds = idx.getDataSource();
+        IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
+        AbstractScanOperator as = (AbstractScanOperator) op;
+        deliveredProperties = dspp.computePropertiesVector(as.getVariables());
+    }
+
+    protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, IOperatorSchema[] inputSchemas) {
+        if (keyVarList == null) {
+            return null;
+        }
+        int[] keyIndexes = new int[keyVarList.size()];
+        for (int i = 0; i < keyVarList.size(); i++) {
+            keyIndexes[i] = inputSchemas[0].findVariable(keyVarList.get(i));
+        }
+        return keyIndexes;
+    }
+
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        if (requiresBroadcast) {
+            StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+            pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+            return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+        } else {
+            return super.getRequiredPropertiesForChildren(op, reqdByParent);
+        }
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
new file mode 100644
index 0000000..12b2723
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -0,0 +1,288 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
+import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
+import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifierFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+
+/**
+ * Contributes the runtime operator for an unnest-map representing an
+ * inverted-index search.
+ */
+public class InvertedIndexPOperator extends IndexSearchPOperator {
+    private final boolean isPartitioned;
+
+    public InvertedIndexPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
+            boolean isPartitioned) {
+        super(idx, requiresBroadcast);
+        this.isPartitioned = isPartitioned;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        if (isPartitioned) {
+            return PhysicalOperatorTag.LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH;
+        } else {
+            return PhysicalOperatorTag.SINGLE_PARTITION_INVERTED_INDEX_SEARCH;
+        }
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        UnnestMapOperator unnestMapOp = (UnnestMapOperator) op;
+        ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
+        if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            throw new IllegalStateException();
+        }
+        AbstractFunctionCallExpression unnestFuncExpr = (AbstractFunctionCallExpression) unnestExpr;
+        if (unnestFuncExpr.getFunctionIdentifier() != AsterixBuiltinFunctions.INDEX_SEARCH) {
+            return;
+        }
+        InvertedIndexJobGenParams jobGenParams = new InvertedIndexJobGenParams();
+        jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
+
+        AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+        Dataset dataset;
+        try {
+            dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
+                    jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+        int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas);
+
+        int[] minFilterFieldIndexes = getKeyIndexes(unnestMapOp.getMinFilterVars(), inputSchemas);
+        int[] maxFilterFieldIndexes = getKeyIndexes(unnestMapOp.getMaxFilterVars(), inputSchemas);
+        // Build runtime.
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> invIndexSearch = buildInvertedIndexRuntime(
+                metadataProvider, context, builder.getJobSpec(), unnestMapOp, opSchema, jobGenParams.getRetainInput(),
+                jobGenParams.getRetainNull(), jobGenParams.getDatasetName(), dataset, jobGenParams.getIndexName(),
+                jobGenParams.getSearchKeyType(), keyIndexes, jobGenParams.getSearchModifierType(),
+                jobGenParams.getSimilarityThreshold(), minFilterFieldIndexes, maxFilterFieldIndexes);
+
+        // Contribute operator in hyracks job.
+        builder.contributeHyracksOperator(unnestMapOp, invIndexSearch.first);
+        builder.contributeAlgebricksPartitionConstraint(invIndexSearch.first, invIndexSearch.second);
+        ILogicalOperator srcExchange = unnestMapOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(srcExchange, 0, unnestMapOp, 0);
+    }
+
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInvertedIndexRuntime(
+            AqlMetadataProvider metadataProvider, JobGenContext context, JobSpecification jobSpec,
+            UnnestMapOperator unnestMap, IOperatorSchema opSchema, boolean retainInput, boolean retainNull,
+            String datasetName, Dataset dataset, String indexName, ATypeTag searchKeyType, int[] keyFields,
+            SearchModifierType searchModifierType, IAlgebricksConstantValue similarityThreshold,
+            int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
+
+        try {
+            IAObject simThresh = ((AsterixConstantValue) similarityThreshold).getObject();
+            IAType itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+                    dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
+            int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                    dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+            if (secondaryIndex == null) {
+                throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
+                        + datasetName);
+            }
+            List<List<String>> secondaryKeyFieldEntries = secondaryIndex.getKeyFieldNames();
+            List<IAType> secondaryKeyTypeEntries = secondaryIndex.getKeyFieldTypes();
+            int numSecondaryKeys = secondaryKeyFieldEntries.size();
+            if (numSecondaryKeys != 1) {
+                throw new AlgebricksException(
+                        "Cannot use "
+                                + numSecondaryKeys
+                                + " fields as a key for an inverted index. There can be only one field as a key for the inverted index index.");
+            }
+            if (itemType.getTypeTag() != ATypeTag.RECORD) {
+                throw new AlgebricksException("Only record types can be indexed.");
+            }
+            ARecordType recordType = (ARecordType) itemType;
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0),
+                    secondaryKeyFieldEntries.get(0), recordType);
+            IAType secondaryKeyType = keyPairType.first;
+            if (secondaryKeyType == null) {
+                throw new AlgebricksException("Could not find field " + secondaryKeyFieldEntries.get(0)
+                        + " in the schema.");
+            }
+
+            // TODO: For now we assume the type of the generated tokens is the
+            // same as the indexed field.
+            // We need a better way of expressing this because tokens may be
+            // hashed, or an inverted-index may index a list type, etc.
+            int numTokenKeys = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1;
+            ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenKeys];
+            IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenKeys];
+            for (int i = 0; i < numSecondaryKeys; i++) {
+                tokenComparatorFactories[i] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+                tokenTypeTraits[i] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+            }
+            if (isPartitioned) {
+                // The partitioning field is hardcoded to be a short *without* an Asterix type tag.
+                tokenComparatorFactories[numSecondaryKeys] = PointableBinaryComparatorFactory
+                        .of(ShortPointable.FACTORY);
+                tokenTypeTraits[numSecondaryKeys] = ShortPointable.TYPE_TRAITS;
+            }
+
+            IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap);
+            List<LogicalVariable> outputVars = unnestMap.getVariables();
+            if (retainInput) {
+                outputVars = new ArrayList<LogicalVariable>();
+                VariableUtilities.getLiveVariables(unnestMap, outputVars);
+            }
+            RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+
+            int start = outputRecDesc.getFieldCount() - numPrimaryKeys;
+            IBinaryComparatorFactory[] invListsComparatorFactories = JobGenHelper
+                    .variablesToAscBinaryComparatorFactories(outputVars, start, numPrimaryKeys, typeEnv, context);
+            ITypeTraits[] invListsTypeTraits = JobGenHelper.variablesToTypeTraits(outputVars, start, numPrimaryKeys,
+                    typeEnv, context);
+
+            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recordType);
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(
+                    dataset, recordType, context.getBinaryComparatorFactoryProvider());
+
+            int[] filterFields = null;
+            int[] invertedIndexFields = null;
+            int[] filterFieldsForNonBulkLoadOps = null;
+            int[] invertedIndexFieldsForNonBulkLoadOps = null;
+            if (filterTypeTraits != null) {
+                filterFields = new int[1];
+                filterFields[0] = numTokenKeys + numPrimaryKeys;
+                invertedIndexFields = new int[numTokenKeys + numPrimaryKeys];
+                for (int k = 0; k < invertedIndexFields.length; k++) {
+                    invertedIndexFields[k] = k;
+                }
+
+                filterFieldsForNonBulkLoadOps = new int[1];
+                filterFieldsForNonBulkLoadOps[0] = numPrimaryKeys + numSecondaryKeys;
+                invertedIndexFieldsForNonBulkLoadOps = new int[numPrimaryKeys + numSecondaryKeys];
+                for (int k = 0; k < invertedIndexFieldsForNonBulkLoadOps.length; k++) {
+                    invertedIndexFieldsForNonBulkLoadOps[k] = k;
+                }
+            }
+
+            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+                    .splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), datasetName, indexName,
+                            dataset.getDatasetDetails().isTemp());
+            // TODO: Here we assume there is only one search key field.
+            int queryField = keyFields[0];
+            // Get tokenizer and search modifier factories.
+            IInvertedIndexSearchModifierFactory searchModifierFactory = InvertedIndexAccessMethod
+                    .getSearchModifierFactory(searchModifierType, simThresh, secondaryIndex);
+            IBinaryTokenizerFactory queryTokenizerFactory = InvertedIndexAccessMethod.getBinaryTokenizerFactory(
+                    searchModifierType, searchKeyType, secondaryIndex);
+            IIndexDataflowHelperFactory dataflowHelperFactory;
+
+            AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
+                    dataset, metadataProvider.getMetadataTxnContext());
+            boolean temp = dataset.getDatasetDetails().isTemp();
+            if (!isPartitioned) {
+                dataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
+                        new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+                        compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+                        filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+                        invertedIndexFieldsForNonBulkLoadOps, !temp);
+            } else {
+                dataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+                        new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+                        compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+                        filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+                        invertedIndexFieldsForNonBulkLoadOps, !temp);
+            }
+            LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
+                    jobSpec, queryField, appContext.getStorageManagerInterface(), secondarySplitsAndConstraint.first,
+                    appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+                    invListsTypeTraits, invListsComparatorFactories, dataflowHelperFactory, queryTokenizerFactory,
+                    searchModifierFactory, outputRecDesc, retainInput, retainNull, context.getNullWriterFactory(),
+                    NoOpOperationCallbackFactory.INSTANCE, minFilterFieldIndexes, maxFilterFieldIndexes);
+
+            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(invIndexSearchOp,
+                    secondarySplitsAndConstraint.second);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
new file mode 100644
index 0000000..33f906c
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.optimizer.rules.am.RTreeJobGenParams;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+
+/**
+ * Contributes the runtime operator for an unnest-map representing a RTree
+ * search.
+ */
+public class RTreeSearchPOperator extends IndexSearchPOperator {
+
+    public RTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
+        super(idx, requiresBroadcast);
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.RTREE_SEARCH;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        UnnestMapOperator unnestMap = (UnnestMapOperator) op;
+        ILogicalExpression unnestExpr = unnestMap.getExpressionRef().getValue();
+        if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            throw new IllegalStateException();
+        }
+        AbstractFunctionCallExpression unnestFuncExpr = (AbstractFunctionCallExpression) unnestExpr;
+        FunctionIdentifier funcIdent = unnestFuncExpr.getFunctionIdentifier();
+        if (!funcIdent.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+            return;
+        }
+        RTreeJobGenParams jobGenParams = new RTreeJobGenParams();
+        jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
+        int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas);
+
+        int[] minFilterFieldIndexes = getKeyIndexes(unnestMap.getMinFilterVars(), inputSchemas);
+        int[] maxFilterFieldIndexes = getKeyIndexes(unnestMap.getMaxFilterVars(), inputSchemas);
+
+        AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+        Dataset dataset = mp.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap);
+        List<LogicalVariable> outputVars = unnestMap.getVariables();
+        if (jobGenParams.getRetainInput()) {
+            outputVars = new ArrayList<LogicalVariable>();
+            VariableUtilities.getLiveVariables(unnestMap, outputVars);
+        }
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch = mp.buildRtreeRuntime(
+                builder.getJobSpec(), outputVars, opSchema, typeEnv, context, jobGenParams.getRetainInput(),
+                jobGenParams.getRetainNull(), dataset, jobGenParams.getIndexName(), keyIndexes, minFilterFieldIndexes,
+                maxFilterFieldIndexes);
+
+        builder.contributeHyracksOperator(unnestMap, rtreeSearch.first);
+        builder.contributeAlgebricksPartitionConstraint(rtreeSearch.first, rtreeSearch.second);
+        ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/jobgen/AqlLogicalExpressionJobGen.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/jobgen/AqlLogicalExpressionJobGen.java b/asterix-algebra/src/main/java/org/apache/asterix/jobgen/AqlLogicalExpressionJobGen.java
new file mode 100644
index 0000000..99e8f25
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/jobgen/AqlLogicalExpressionJobGen.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.jobgen;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.functions.FunctionDescriptorTag;
+import edu.uci.ics.asterix.external.library.ExternalFunctionDescriptorProvider;
+import edu.uci.ics.asterix.formats.base.IDataFormat;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.comparisons.ComparisonEvalFactory;
+import edu.uci.ics.asterix.runtime.formats.FormatUtils;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ILogicalExpressionJobGen;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions.ComparisonKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
+
+public class AqlLogicalExpressionJobGen implements ILogicalExpressionJobGen {
+
+    public static final AqlLogicalExpressionJobGen INSTANCE = new AqlLogicalExpressionJobGen();
+
+    private AqlLogicalExpressionJobGen() {
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+            throws AlgebricksException {
+        ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
+        IFunctionDescriptor fd = getFunctionDescriptor(expr, env, context);
+        switch (fd.getFunctionDescriptorTag()) {
+            case SERIALAGGREGATE:
+                return null;
+            case AGGREGATE:
+                return fd.createAggregateFunctionFactory(args);
+            default:
+                throw new IllegalStateException("Invalid function descriptor " + fd.getFunctionDescriptorTag()
+                        + " expected " + FunctionDescriptorTag.SERIALAGGREGATE + " or "
+                        + FunctionDescriptorTag.AGGREGATE);
+        }
+    }
+
+    @Override
+    public ICopyRunningAggregateFunctionFactory createRunningAggregateFunctionFactory(
+            StatefulFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
+            JobGenContext context) throws AlgebricksException {
+        ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
+        return getFunctionDescriptor(expr, env, context).createRunningAggregateFunctionFactory(args);
+    }
+
+    @Override
+    public ICopyUnnestingFunctionFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+            throws AlgebricksException {
+        ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
+        return getFunctionDescriptor(expr, env, context).createUnnestingFunctionFactory(args);
+    }
+
+    @Override
+    public ICopyEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
+            IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
+        ICopyEvaluatorFactory copyEvaluatorFactory = null;
+        switch (expr.getExpressionTag()) {
+            case VARIABLE: {
+                VariableReferenceExpression v = (VariableReferenceExpression) expr;
+                copyEvaluatorFactory = createVariableEvaluatorFactory(v, inputSchemas, context);
+                return copyEvaluatorFactory;
+            }
+            case CONSTANT: {
+                ConstantExpression c = (ConstantExpression) expr;
+                copyEvaluatorFactory = createConstantEvaluatorFactory(c, inputSchemas, context);
+                return copyEvaluatorFactory;
+            }
+            case FUNCTION_CALL: {
+                copyEvaluatorFactory = createScalarFunctionEvaluatorFactory((AbstractFunctionCallExpression) expr, env,
+                        inputSchemas, context);
+                return copyEvaluatorFactory;
+            }
+            default:
+                throw new IllegalStateException();
+        }
+
+    }
+
+    private ICopyEvaluatorFactory createVariableEvaluatorFactory(VariableReferenceExpression expr,
+            IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
+        LogicalVariable variable = expr.getVariableReference();
+        for (IOperatorSchema scm : inputSchemas) {
+            int pos = scm.findVariable(variable);
+            if (pos >= 0) {
+                return new ColumnAccessEvalFactory(pos);
+            }
+        }
+        throw new AlgebricksException("Variable " + variable + " could not be found in any input schema.");
+    }
+
+    private ICopyEvaluatorFactory createScalarFunctionEvaluatorFactory(AbstractFunctionCallExpression expr,
+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+            throws AlgebricksException {
+        ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
+        FunctionIdentifier fi = expr.getFunctionIdentifier();
+        ComparisonKind ck = AlgebricksBuiltinFunctions.getComparisonType(fi);
+        if (ck != null) {
+            return new ComparisonEvalFactory(args[0], args[1], ck);
+        }
+
+        IFunctionDescriptor fd = null;
+        if (!(expr.getFunctionInfo() instanceof IExternalFunctionInfo)) {
+            AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+            IDataFormat format = FormatUtils.getDefaultFormat();
+            fd = format.resolveFunction(expr, env);
+        } else {
+            try {
+                fd = ExternalFunctionDescriptorProvider.getExternalFunctionDescriptor((IExternalFunctionInfo) expr
+                        .getFunctionInfo());
+            } catch (AsterixException ae) {
+                throw new AlgebricksException(ae);
+            }
+        }
+        return fd.createEvaluatorFactory(args);
+    }
+
+    private ICopyEvaluatorFactory createConstantEvaluatorFactory(ConstantExpression expr,
+            IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
+        AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+        IDataFormat format = FormatUtils.getDefaultFormat();
+        return format.getConstantEvalFactory(expr.getValue());
+    }
+
+    private ICopyEvaluatorFactory[] codegenArguments(AbstractFunctionCallExpression expr, IVariableTypeEnvironment env,
+            IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
+        List<Mutable<ILogicalExpression>> arguments = expr.getArguments();
+        int n = arguments.size();
+        ICopyEvaluatorFactory[] args = new ICopyEvaluatorFactory[n];
+        int i = 0;
+        for (Mutable<ILogicalExpression> a : arguments) {
+            args[i++] = createEvaluatorFactory(a.getValue(), env, inputSchemas, context);
+        }
+        return args;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
+            JobGenContext context) throws AlgebricksException {
+        ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
+        IFunctionDescriptor fd = getFunctionDescriptor(expr, env, context);
+
+        switch (fd.getFunctionDescriptorTag()) {
+            case AGGREGATE: {
+                if (AsterixBuiltinFunctions.isAggregateFunctionSerializable(fd.getIdentifier())) {
+                    AggregateFunctionCallExpression serialAggExpr = AsterixBuiltinFunctions
+                            .makeSerializableAggregateFunctionExpression(fd.getIdentifier(), expr.getArguments());
+                    IFunctionDescriptor afdd = getFunctionDescriptor(serialAggExpr, env, context);
+                    return afdd.createSerializableAggregateFunctionFactory(args);
+                } else {
+                    throw new AlgebricksException(
+                            "Trying to create a serializable aggregate from a non-serializable aggregate function descriptor. (fi="
+                                    + expr.getFunctionIdentifier() + ")");
+                }
+            }
+            case SERIALAGGREGATE: {
+                return fd.createSerializableAggregateFunctionFactory(args);
+            }
+
+            default:
+                throw new IllegalStateException("Invalid function descriptor " + fd.getFunctionDescriptorTag()
+                        + " expected " + FunctionDescriptorTag.SERIALAGGREGATE + " or "
+                        + FunctionDescriptorTag.AGGREGATE);
+        }
+    }
+
+    private IFunctionDescriptor getFunctionDescriptor(AbstractFunctionCallExpression expr,
+            IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
+        IFunctionDescriptor fd;
+        AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+        fd = FormatUtils.getDefaultFormat().resolveFunction(expr, env);
+        return fd;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
new file mode 100644
index 0000000..2d3ca0a
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.base;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractDataSourceOperator;
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+
+public class AnalysisUtil {
+    /*
+     * If the first child of op is of type opType, then it returns that child,
+     * o/w returns null.
+     */
+    public final static ILogicalOperator firstChildOfType(AbstractLogicalOperator op, LogicalOperatorTag opType) {
+        List<Mutable<ILogicalOperator>> ins = op.getInputs();
+        if (ins == null || ins.isEmpty()) {
+            return null;
+        }
+        Mutable<ILogicalOperator> opRef2 = ins.get(0);
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
+        if (op2.getOperatorTag() == opType) {
+            return op2;
+        } else {
+            return null;
+        }
+    }
+
+    public static int numberOfVarsInExpr(ILogicalExpression e) {
+        switch (((AbstractLogicalExpression) e).getExpressionTag()) {
+            case CONSTANT: {
+                return 0;
+            }
+            case FUNCTION_CALL: {
+                AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) e;
+                int s = 0;
+                for (Mutable<ILogicalExpression> arg : f.getArguments()) {
+                    s += numberOfVarsInExpr(arg.getValue());
+                }
+                return s;
+            }
+            case VARIABLE: {
+                return 1;
+            }
+            default: {
+                assert false;
+                throw new IllegalArgumentException();
+            }
+        }
+    }
+
+    public static boolean isRunnableFieldAccessFunction(FunctionIdentifier fid) {
+        return fieldAccessFunctions.contains(fid);
+    }
+
+    public static boolean isDataSetCall(ILogicalExpression e) {
+        if (((AbstractLogicalExpression) e).getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return false;
+        }
+        AbstractFunctionCallExpression fe = (AbstractFunctionCallExpression) e;
+        return AsterixBuiltinFunctions.isDatasetFunction(fe.getFunctionIdentifier());
+    }
+
+    public static boolean isRunnableAccessToFieldRecord(ILogicalExpression expr) {
+        if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            AbstractFunctionCallExpression fc = (AbstractFunctionCallExpression) expr;
+            FunctionIdentifier fid = fc.getFunctionIdentifier();
+            if (AnalysisUtil.isRunnableFieldAccessFunction(fid)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static boolean isAccessByNameToFieldRecord(ILogicalExpression expr) {
+        if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            AbstractFunctionCallExpression fc = (AbstractFunctionCallExpression) expr;
+            FunctionIdentifier fid = fc.getFunctionIdentifier();
+            if (fid.equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static boolean isAccessToFieldRecord(ILogicalExpression expr) {
+        if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            AbstractFunctionCallExpression fc = (AbstractFunctionCallExpression) expr;
+            FunctionIdentifier fid = fc.getFunctionIdentifier();
+            if (fid.equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX)
+                    || fid.equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME)
+                    || fid.equals(AsterixBuiltinFunctions.FIELD_ACCESS_NESTED)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static Pair<String, String> getDatasetInfo(AbstractDataSourceOperator op) throws AlgebricksException {
+        AqlSourceId srcId = (AqlSourceId) op.getDataSource().getId();
+        return new Pair<String, String>(srcId.getDataverseName(), srcId.getDatasourceName());
+    }
+
+    private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<FunctionIdentifier>();
+    static {
+        fieldAccessFunctions.add(AsterixBuiltinFunctions.GET_DATA);
+        fieldAccessFunctions.add(AsterixBuiltinFunctions.GET_HANDLE);
+        fieldAccessFunctions.add(AsterixBuiltinFunctions.TYPE_OF);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
new file mode 100644
index 0000000..063c887
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.base;
+
+import java.util.ArrayList;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.om.base.AFloat;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class FuzzyUtils {
+
+    private final static String DEFAULT_SIM_FUNCTION = "jaccard";
+    private final static float JACCARD_DEFAULT_SIM_THRESHOLD = .8f;
+    private final static int EDIT_DISTANCE_DEFAULT_SIM_THRESHOLD = 1;
+
+    private final static String SIM_FUNCTION_PROP_NAME = "simfunction";
+    private final static String SIM_THRESHOLD_PROP_NAME = "simthreshold";
+
+    public final static String JACCARD_FUNCTION_NAME = "jaccard";
+    public final static String EDIT_DISTANCE_FUNCTION_NAME = "edit-distance";
+
+    public static FunctionIdentifier getTokenizer(ATypeTag inputTag) {
+        switch (inputTag) {
+            case STRING:
+                return AsterixBuiltinFunctions.COUNTHASHED_WORD_TOKENS;
+            case UNORDEREDLIST:
+            case ORDEREDLIST:
+            case ANY:
+                return null;
+            default:
+                throw new NotImplementedException("No tokenizer for type " + inputTag);
+        }
+    }
+
+    public static IAObject getSimThreshold(AqlMetadataProvider metadata, String simFuncName) {
+        String simThresholValue = metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME);
+        IAObject ret = null;
+        if (simFuncName.equals(JACCARD_FUNCTION_NAME)) {
+            if (simThresholValue != null) {
+                float jaccThresh = Float.parseFloat(simThresholValue);
+                ret = new AFloat(jaccThresh);
+            } else {
+                ret = new AFloat(JACCARD_DEFAULT_SIM_THRESHOLD);
+            }
+        } else if (simFuncName.equals(EDIT_DISTANCE_FUNCTION_NAME)) {
+            if (simThresholValue != null) {
+                int edThresh = Integer.parseInt(simThresholValue);
+                ret = new AInt32(edThresh);
+            } else {
+                ret = new AFloat(EDIT_DISTANCE_DEFAULT_SIM_THRESHOLD);
+            }
+        }
+        return ret;
+    }
+
+    public static FunctionIdentifier getFunctionIdentifier(String simFuncName) {
+        if (simFuncName.equals(JACCARD_FUNCTION_NAME)) {
+            return AsterixBuiltinFunctions.SIMILARITY_JACCARD;
+        } else if (simFuncName.equals(EDIT_DISTANCE_FUNCTION_NAME)) {
+            return AsterixBuiltinFunctions.EDIT_DISTANCE;
+        }
+        return null;
+    }
+
+    public static ScalarFunctionCallExpression getComparisonExpr(String simFuncName,
+            ArrayList<Mutable<ILogicalExpression>> cmpArgs) {
+        if (simFuncName.equals(JACCARD_FUNCTION_NAME)) {
+            return new ScalarFunctionCallExpression(FunctionUtils.getFunctionInfo(AlgebricksBuiltinFunctions.GE),
+                    cmpArgs);
+        } else if (simFuncName.equals(EDIT_DISTANCE_FUNCTION_NAME)) {
+            return new ScalarFunctionCallExpression(FunctionUtils.getFunctionInfo(AlgebricksBuiltinFunctions.LE),
+                    cmpArgs);
+        }
+        return null;
+    }
+
+    public static float getSimThreshold(AqlMetadataProvider metadata) {
+        float simThreshold = JACCARD_DEFAULT_SIM_THRESHOLD;
+        String simThresholValue = metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME);
+        if (simThresholValue != null) {
+            simThreshold = Float.parseFloat(simThresholValue);
+        }
+        return simThreshold;
+    }
+
+    // TODO: The default function depend on the input types. 
+    public static String getSimFunction(AqlMetadataProvider metadata) {
+        String simFunction = metadata.getPropertyValue(SIM_FUNCTION_PROP_NAME);
+        if (simFunction == null) {
+            simFunction = DEFAULT_SIM_FUNCTION;
+        }
+        simFunction = simFunction.toLowerCase();
+        return simFunction;
+    }
+}