You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:44:23 UTC
[35/51] [partial] incubator-asterixdb git commit: Change folder
structure for Java repackage
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
new file mode 100644
index 0000000..5a4b52a
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.optimizer.rules.am.BTreeJobGenParams;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+
+/**
+ * Contributes the runtime operator for an unnest-map representing a BTree search.
+ */
+public class BTreeSearchPOperator extends IndexSearchPOperator {
+
+ private final List<LogicalVariable> lowKeyVarList;
+ private final List<LogicalVariable> highKeyVarList;
+ private final boolean isPrimaryIndex;
+ private final boolean isEqCondition;
+ private Object implConfig;
+
+ public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
+ boolean isPrimaryIndex, boolean isEqCondition, List<LogicalVariable> lowKeyVarList,
+ List<LogicalVariable> highKeyVarList) {
+ super(idx, requiresBroadcast);
+ this.isPrimaryIndex = isPrimaryIndex;
+ this.isEqCondition = isEqCondition;
+ this.lowKeyVarList = lowKeyVarList;
+ this.highKeyVarList = highKeyVarList;
+ }
+
+ public void setImplConfig(Object implConfig) {
+ this.implConfig = implConfig;
+ }
+
+ public Object getImplConfig() {
+ return implConfig;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.BTREE_SEARCH;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ UnnestMapOperator unnestMap = (UnnestMapOperator) op;
+ ILogicalExpression unnestExpr = unnestMap.getExpressionRef().getValue();
+ if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ throw new IllegalStateException();
+ }
+ AbstractFunctionCallExpression unnestFuncExpr = (AbstractFunctionCallExpression) unnestExpr;
+ FunctionIdentifier funcIdent = unnestFuncExpr.getFunctionIdentifier();
+ if (!funcIdent.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+ return;
+ }
+ BTreeJobGenParams jobGenParams = new BTreeJobGenParams();
+ jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
+ int[] lowKeyIndexes = getKeyIndexes(jobGenParams.getLowKeyVarList(), inputSchemas);
+ int[] highKeyIndexes = getKeyIndexes(jobGenParams.getHighKeyVarList(), inputSchemas);
+
+ int[] minFilterFieldIndexes = getKeyIndexes(unnestMap.getMinFilterVars(), inputSchemas);
+ int[] maxFilterFieldIndexes = getKeyIndexes(unnestMap.getMaxFilterVars(), inputSchemas);
+
+ AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+ Dataset dataset = metadataProvider.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+ List<LogicalVariable> outputVars = unnestMap.getVariables();
+ if (jobGenParams.getRetainInput()) {
+ outputVars = new ArrayList<LogicalVariable>();
+ VariableUtilities.getLiveVariables(unnestMap, outputVars);
+ }
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = metadataProvider.buildBtreeRuntime(
+ builder.getJobSpec(), outputVars, opSchema, typeEnv, context, jobGenParams.getRetainInput(),
+ jobGenParams.getRetainNull(), dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes,
+ jobGenParams.isLowKeyInclusive(), jobGenParams.isHighKeyInclusive(), implConfig, minFilterFieldIndexes,
+ maxFilterFieldIndexes);
+
+ builder.contributeHyracksOperator(unnestMap, btreeSearch.first);
+ builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, btreeSearch.second);
+
+ ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ if (requiresBroadcast) {
+ // For primary indexes optimizing an equality condition we can reduce the broadcast requirement to hash partitioning.
+ if (isPrimaryIndex && isEqCondition) {
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+ ListSet<LogicalVariable> searchKeyVars = new ListSet<LogicalVariable>();
+ searchKeyVars.addAll(lowKeyVarList);
+ searchKeyVars.addAll(highKeyVarList);
+ // Also, add a local sorting property to enforce a sort before the primary-index operator.
+ List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+ List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+ for (LogicalVariable orderVar : searchKeyVars) {
+ orderColumns.add(new OrderColumn(orderVar, OrderKind.ASC));
+ }
+ propsLocal.add(new LocalOrderProperty(orderColumns));
+ pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(searchKeyVars, null),
+ propsLocal);
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ } else {
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+ pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+ } else {
+ return super.getRequiredPropertiesForChildren(op, reqdByParent);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
new file mode 100644
index 0000000..6722cbe
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class CommitPOperator extends AbstractPhysicalOperator {
+
+ private final List<LogicalVariable> primaryKeyLogicalVars;
+ private final JobId jobId;
+ private final int datasetId;
+
+ public CommitPOperator(JobId jobId, int datasetId, List<LogicalVariable> primaryKeyLogicalVars) {
+ this.jobId = jobId;
+ this.datasetId = datasetId;
+ this.primaryKeyLogicalVars = primaryKeyLogicalVars;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.EXTENSION_OPERATOR;
+ }
+
+ @Override
+ public String toString() {
+ return "COMMIT";
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+ context);
+ int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
+
+ AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+ CommitRuntimeFactory runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
+ metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction());
+ builder.contributeMicroOperator(op, runtime, recDesc);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
new file mode 100644
index 0000000..69c8f78
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
+
+public class CommitRuntime implements IPushRuntime {
+
+ private final static long SEED = 0L;
+
+ private final IHyracksTaskContext hyracksTaskCtx;
+ private final ITransactionManager transactionManager;
+ private final ILogManager logMgr;
+ private final JobId jobId;
+ private final int datasetId;
+ private final int[] primaryKeyFields;
+ private final boolean isTemporaryDatasetWriteJob;
+ private final boolean isWriteTransaction;
+ private final long[] longHashes;
+ private final LogRecord logRecord;
+
+ private ITransactionContext transactionContext;
+ private FrameTupleAccessor frameTupleAccessor;
+ private final FrameTupleReference frameTupleReference;
+
+ public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
+ boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction) {
+ this.hyracksTaskCtx = ctx;
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
+ this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
+ this.jobId = jobId;
+ this.datasetId = datasetId;
+ this.primaryKeyFields = primaryKeyFields;
+ this.frameTupleReference = new FrameTupleReference();
+ this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
+ this.isWriteTransaction = isWriteTransaction;
+ this.longHashes = new long[2];
+ this.logRecord = new LogRecord();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ try {
+ transactionContext = transactionManager.getTransactionContext(jobId, false);
+ transactionContext.setWriteTxn(isWriteTransaction);
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ int pkHash = 0;
+ frameTupleAccessor.reset(buffer);
+ int nTuple = frameTupleAccessor.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ if (isTemporaryDatasetWriteJob) {
+ /**
+ * This "if branch" is for writes over temporary datasets.
+ * A temporary dataset does not require any lock and does not generate any write-ahead
+ * update and commit log but generates flush log and job commit log.
+ * However, a temporary dataset still MUST guarantee no-steal policy so that this
+ * notification call should be delivered to PrimaryIndexOptracker and used correctly in order
+ * to decrement number of active operation count of PrimaryIndexOptracker.
+ * By maintaining the count correctly and only allowing flushing when the count is 0, it can
+ * guarantee the no-steal policy for temporary datasets, too.
+ */
+ transactionContext.notifyOptracker(false);
+ } else {
+ frameTupleReference.reset(frameTupleAccessor, t);
+ pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
+ logRecord.formEntityCommitLogRecord(transactionContext, datasetId, pkHash, frameTupleReference,
+ primaryKeyFields);
+ try {
+ logMgr.log(logRecord);
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ }
+
+ private int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {
+ MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
+ return Math.abs((int) longHashes[0]);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ this.frameTupleAccessor = new FrameTupleAccessor(recordDescriptor);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
new file mode 100644
index 0000000..768cdb6
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class CommitRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final int datasetId;
+ private final int[] primaryKeyFields;
+ private final boolean isTemporaryDatasetWriteJob;
+ private final boolean isWriteTransaction;
+
+ public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
+ boolean isWriteTransaction) {
+ this.jobId = jobId;
+ this.datasetId = datasetId;
+ this.primaryKeyFields = primaryKeyFields;
+ this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
+ this.isWriteTransaction = isWriteTransaction;
+ }
+
+ @Override
+ public String toString() {
+ return "commit";
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+ return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
+ isWriteTransaction);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
new file mode 100644
index 0000000..dcbc70c
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.external.indexing.dataflow.HDFSLookupAdapterFactory;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.declared.DatasetDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractScanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+
+public class ExternalDataLookupPOperator extends AbstractScanPOperator {
+
+ private final List<LogicalVariable> ridVarList;
+ private AqlSourceId datasetId;
+ private Dataset dataset;
+ private ARecordType recordType;
+ private Index secondaryIndex;
+ private boolean requiresBroadcast;
+ private boolean retainInput;
+ private boolean retainNull;
+
+ public ExternalDataLookupPOperator(AqlSourceId datasetId, Dataset dataset, ARecordType recordType,
+ Index secondaryIndex, List<LogicalVariable> ridVarList, boolean requiresBroadcast, boolean retainInput, boolean retainNull) {
+ this.datasetId = datasetId;
+ this.dataset = dataset;
+ this.recordType = recordType;
+ this.secondaryIndex = secondaryIndex;
+ this.ridVarList = ridVarList;
+ this.requiresBroadcast = requiresBroadcast;
+ this.retainInput = retainInput;
+ this.retainNull = retainNull;
+ }
+
+ public Dataset getDataset() {
+ return dataset;
+ }
+
+ public void setDataset(Dataset dataset) {
+ this.dataset = dataset;
+ }
+
+ public ARecordType getRecordType() {
+ return recordType;
+ }
+
+ public void setRecordType(ARecordType recordType) {
+ this.recordType = recordType;
+ }
+
+ public AqlSourceId getDatasetId() {
+ return datasetId;
+ }
+
+ public void setDatasetId(AqlSourceId datasetId) {
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.EXTERNAL_LOOKUP;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AqlDataSource ds = new DatasetDataSource(datasetId, datasetId.getDataverseName(), datasetId.getDatasourceName(),
+ recordType, AqlDataSourceType.EXTERNAL_DATASET);
+ IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
+ AbstractScanOperator as = (AbstractScanOperator) op;
+ deliveredProperties = dspp.computePropertiesVector(as.getVariables());
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ ExternalDataLookupOperator edabro = (ExternalDataLookupOperator) op;
+ ILogicalExpression expr = edabro.getExpressionRef().getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ throw new IllegalStateException();
+ }
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+ FunctionIdentifier funcIdent = funcExpr.getFunctionIdentifier();
+ if (!funcIdent.equals(AsterixBuiltinFunctions.EXTERNAL_LOOKUP)) {
+ return;
+ }
+ int[] ridIndexes = getKeyIndexes(ridVarList, inputSchemas);
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+ List<LogicalVariable> outputVars = new ArrayList<LogicalVariable>();
+ if (retainInput) {
+ VariableUtilities.getLiveVariables(edabro, outputVars);
+ } else {
+ VariableUtilities.getProducedVariables(edabro, outputVars);
+ }
+
+ AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> externalLoopup = HDFSLookupAdapterFactory
+ .buildExternalDataLookupRuntime(builder.getJobSpec(), dataset, secondaryIndex, ridIndexes, retainInput,
+ typeEnv, outputVars, opSchema, context, metadataProvider, retainNull);
+ builder.contributeHyracksOperator(edabro, externalLoopup.first);
+ builder.contributeAlgebricksPartitionConstraint(externalLoopup.first, externalLoopup.second);
+ ILogicalOperator srcExchange = edabro.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(srcExchange, 0, edabro, 0);
+ }
+
+ protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, IOperatorSchema[] inputSchemas) {
+ if (keyVarList == null) {
+ return null;
+ }
+ int[] keyIndexes = new int[keyVarList.size()];
+ for (int i = 0; i < keyVarList.size(); i++) {
+ keyIndexes[i] = inputSchemas[0].findVariable(keyVarList.get(i));
+ }
+ return keyIndexes;
+ }
+
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ if (requiresBroadcast) {
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+ pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+
+ } else {
+ return super.getRequiredPropertiesForChildren(op, reqdByParent);
+ }
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
new file mode 100644
index 0000000..1793407
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractScanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+
+/**
+ * Class that embodies the commonalities between access method physical operators.
+ */
+public abstract class IndexSearchPOperator extends AbstractScanPOperator {
+
+ protected final IDataSourceIndex<String, AqlSourceId> idx;
+ protected final boolean requiresBroadcast;
+
+ public IndexSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
+ this.idx = idx;
+ this.requiresBroadcast = requiresBroadcast;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ IDataSource<?> ds = idx.getDataSource();
+ IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
+ AbstractScanOperator as = (AbstractScanOperator) op;
+ deliveredProperties = dspp.computePropertiesVector(as.getVariables());
+ }
+
+ protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, IOperatorSchema[] inputSchemas) {
+ if (keyVarList == null) {
+ return null;
+ }
+ int[] keyIndexes = new int[keyVarList.size()];
+ for (int i = 0; i < keyVarList.size(); i++) {
+ keyIndexes[i] = inputSchemas[0].findVariable(keyVarList.get(i));
+ }
+ return keyIndexes;
+ }
+
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ if (requiresBroadcast) {
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+ pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ } else {
+ return super.getRequiredPropertiesForChildren(op, reqdByParent);
+ }
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
new file mode 100644
index 0000000..12b2723
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -0,0 +1,288 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
+import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
+import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifierFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+
+/**
+ * Contributes the runtime operator for an unnest-map representing an
+ * inverted-index search.
+ */
+public class InvertedIndexPOperator extends IndexSearchPOperator {
+ private final boolean isPartitioned;
+
+ public InvertedIndexPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
+ boolean isPartitioned) {
+ super(idx, requiresBroadcast);
+ this.isPartitioned = isPartitioned;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ if (isPartitioned) {
+ return PhysicalOperatorTag.LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH;
+ } else {
+ return PhysicalOperatorTag.SINGLE_PARTITION_INVERTED_INDEX_SEARCH;
+ }
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ UnnestMapOperator unnestMapOp = (UnnestMapOperator) op;
+ ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
+ if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ throw new IllegalStateException();
+ }
+ AbstractFunctionCallExpression unnestFuncExpr = (AbstractFunctionCallExpression) unnestExpr;
+ if (unnestFuncExpr.getFunctionIdentifier() != AsterixBuiltinFunctions.INDEX_SEARCH) {
+ return;
+ }
+ InvertedIndexJobGenParams jobGenParams = new InvertedIndexJobGenParams();
+ jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
+
+ AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+ Dataset dataset;
+ try {
+ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
+ jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ }
+ int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas);
+
+ int[] minFilterFieldIndexes = getKeyIndexes(unnestMapOp.getMinFilterVars(), inputSchemas);
+ int[] maxFilterFieldIndexes = getKeyIndexes(unnestMapOp.getMaxFilterVars(), inputSchemas);
+ // Build runtime.
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> invIndexSearch = buildInvertedIndexRuntime(
+ metadataProvider, context, builder.getJobSpec(), unnestMapOp, opSchema, jobGenParams.getRetainInput(),
+ jobGenParams.getRetainNull(), jobGenParams.getDatasetName(), dataset, jobGenParams.getIndexName(),
+ jobGenParams.getSearchKeyType(), keyIndexes, jobGenParams.getSearchModifierType(),
+ jobGenParams.getSimilarityThreshold(), minFilterFieldIndexes, maxFilterFieldIndexes);
+
+ // Contribute operator in hyracks job.
+ builder.contributeHyracksOperator(unnestMapOp, invIndexSearch.first);
+ builder.contributeAlgebricksPartitionConstraint(invIndexSearch.first, invIndexSearch.second);
+ ILogicalOperator srcExchange = unnestMapOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(srcExchange, 0, unnestMapOp, 0);
+ }
+
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInvertedIndexRuntime(
+ AqlMetadataProvider metadataProvider, JobGenContext context, JobSpecification jobSpec,
+ UnnestMapOperator unnestMap, IOperatorSchema opSchema, boolean retainInput, boolean retainNull,
+ String datasetName, Dataset dataset, String indexName, ATypeTag searchKeyType, int[] keyFields,
+ SearchModifierType searchModifierType, IAlgebricksConstantValue similarityThreshold,
+ int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
+
+ try {
+ IAObject simThresh = ((AsterixConstantValue) similarityThreshold).getObject();
+ IAType itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+ dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
+ int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+ Index secondaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+ dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+ if (secondaryIndex == null) {
+ throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
+ + datasetName);
+ }
+ List<List<String>> secondaryKeyFieldEntries = secondaryIndex.getKeyFieldNames();
+ List<IAType> secondaryKeyTypeEntries = secondaryIndex.getKeyFieldTypes();
+ int numSecondaryKeys = secondaryKeyFieldEntries.size();
+ if (numSecondaryKeys != 1) {
+ throw new AlgebricksException(
+ "Cannot use "
+ + numSecondaryKeys
+ + " fields as a key for an inverted index. There can be only one field as a key for the inverted index index.");
+ }
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Only record types can be indexed.");
+ }
+ ARecordType recordType = (ARecordType) itemType;
+ Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0),
+ secondaryKeyFieldEntries.get(0), recordType);
+ IAType secondaryKeyType = keyPairType.first;
+ if (secondaryKeyType == null) {
+ throw new AlgebricksException("Could not find field " + secondaryKeyFieldEntries.get(0)
+ + " in the schema.");
+ }
+
+ // TODO: For now we assume the type of the generated tokens is the
+ // same as the indexed field.
+ // We need a better way of expressing this because tokens may be
+ // hashed, or an inverted-index may index a list type, etc.
+ int numTokenKeys = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1;
+ ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenKeys];
+ IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenKeys];
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ tokenComparatorFactories[i] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
+ tokenTypeTraits[i] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+ }
+ if (isPartitioned) {
+ // The partitioning field is hardcoded to be a short *without* an Asterix type tag.
+ tokenComparatorFactories[numSecondaryKeys] = PointableBinaryComparatorFactory
+ .of(ShortPointable.FACTORY);
+ tokenTypeTraits[numSecondaryKeys] = ShortPointable.TYPE_TRAITS;
+ }
+
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap);
+ List<LogicalVariable> outputVars = unnestMap.getVariables();
+ if (retainInput) {
+ outputVars = new ArrayList<LogicalVariable>();
+ VariableUtilities.getLiveVariables(unnestMap, outputVars);
+ }
+ RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+
+ int start = outputRecDesc.getFieldCount() - numPrimaryKeys;
+ IBinaryComparatorFactory[] invListsComparatorFactories = JobGenHelper
+ .variablesToAscBinaryComparatorFactories(outputVars, start, numPrimaryKeys, typeEnv, context);
+ ITypeTraits[] invListsTypeTraits = JobGenHelper.variablesToTypeTraits(outputVars, start, numPrimaryKeys,
+ typeEnv, context);
+
+ ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recordType);
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(
+ dataset, recordType, context.getBinaryComparatorFactoryProvider());
+
+ int[] filterFields = null;
+ int[] invertedIndexFields = null;
+ int[] filterFieldsForNonBulkLoadOps = null;
+ int[] invertedIndexFieldsForNonBulkLoadOps = null;
+ if (filterTypeTraits != null) {
+ filterFields = new int[1];
+ filterFields[0] = numTokenKeys + numPrimaryKeys;
+ invertedIndexFields = new int[numTokenKeys + numPrimaryKeys];
+ for (int k = 0; k < invertedIndexFields.length; k++) {
+ invertedIndexFields[k] = k;
+ }
+
+ filterFieldsForNonBulkLoadOps = new int[1];
+ filterFieldsForNonBulkLoadOps[0] = numPrimaryKeys + numSecondaryKeys;
+ invertedIndexFieldsForNonBulkLoadOps = new int[numPrimaryKeys + numSecondaryKeys];
+ for (int k = 0; k < invertedIndexFieldsForNonBulkLoadOps.length; k++) {
+ invertedIndexFieldsForNonBulkLoadOps[k] = k;
+ }
+ }
+
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), datasetName, indexName,
+ dataset.getDatasetDetails().isTemp());
+ // TODO: Here we assume there is only one search key field.
+ int queryField = keyFields[0];
+ // Get tokenizer and search modifier factories.
+ IInvertedIndexSearchModifierFactory searchModifierFactory = InvertedIndexAccessMethod
+ .getSearchModifierFactory(searchModifierType, simThresh, secondaryIndex);
+ IBinaryTokenizerFactory queryTokenizerFactory = InvertedIndexAccessMethod.getBinaryTokenizerFactory(
+ searchModifierType, searchKeyType, secondaryIndex);
+ IIndexDataflowHelperFactory dataflowHelperFactory;
+
+ AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
+ dataset, metadataProvider.getMetadataTxnContext());
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ if (!isPartitioned) {
+ dataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+ compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+ filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+ invertedIndexFieldsForNonBulkLoadOps, !temp);
+ } else {
+ dataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+ compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+ filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+ invertedIndexFieldsForNonBulkLoadOps, !temp);
+ }
+ LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
+ jobSpec, queryField, appContext.getStorageManagerInterface(), secondarySplitsAndConstraint.first,
+ appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+ invListsTypeTraits, invListsComparatorFactories, dataflowHelperFactory, queryTokenizerFactory,
+ searchModifierFactory, outputRecDesc, retainInput, retainNull, context.getNullWriterFactory(),
+ NoOpOperationCallbackFactory.INSTANCE, minFilterFieldIndexes, maxFilterFieldIndexes);
+
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(invIndexSearchOp,
+ secondarySplitsAndConstraint.second);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
new file mode 100644
index 0000000..33f906c
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.optimizer.rules.am.RTreeJobGenParams;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+
+/**
+ * Contributes the runtime operator for an unnest-map representing a RTree
+ * search.
+ */
+public class RTreeSearchPOperator extends IndexSearchPOperator {
+
+ public RTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
+ super(idx, requiresBroadcast);
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.RTREE_SEARCH;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ UnnestMapOperator unnestMap = (UnnestMapOperator) op;
+ ILogicalExpression unnestExpr = unnestMap.getExpressionRef().getValue();
+ if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ throw new IllegalStateException();
+ }
+ AbstractFunctionCallExpression unnestFuncExpr = (AbstractFunctionCallExpression) unnestExpr;
+ FunctionIdentifier funcIdent = unnestFuncExpr.getFunctionIdentifier();
+ if (!funcIdent.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
+ return;
+ }
+ RTreeJobGenParams jobGenParams = new RTreeJobGenParams();
+ jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
+ int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas);
+
+ int[] minFilterFieldIndexes = getKeyIndexes(unnestMap.getMinFilterVars(), inputSchemas);
+ int[] maxFilterFieldIndexes = getKeyIndexes(unnestMap.getMaxFilterVars(), inputSchemas);
+
+ AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+ Dataset dataset = mp.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap);
+ List<LogicalVariable> outputVars = unnestMap.getVariables();
+ if (jobGenParams.getRetainInput()) {
+ outputVars = new ArrayList<LogicalVariable>();
+ VariableUtilities.getLiveVariables(unnestMap, outputVars);
+ }
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch = mp.buildRtreeRuntime(
+ builder.getJobSpec(), outputVars, opSchema, typeEnv, context, jobGenParams.getRetainInput(),
+ jobGenParams.getRetainNull(), dataset, jobGenParams.getIndexName(), keyIndexes, minFilterFieldIndexes,
+ maxFilterFieldIndexes);
+
+ builder.contributeHyracksOperator(unnestMap, rtreeSearch.first);
+ builder.contributeAlgebricksPartitionConstraint(rtreeSearch.first, rtreeSearch.second);
+ ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/jobgen/AqlLogicalExpressionJobGen.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/jobgen/AqlLogicalExpressionJobGen.java b/asterix-algebra/src/main/java/org/apache/asterix/jobgen/AqlLogicalExpressionJobGen.java
new file mode 100644
index 0000000..99e8f25
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/jobgen/AqlLogicalExpressionJobGen.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.jobgen;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.functions.FunctionDescriptorTag;
+import edu.uci.ics.asterix.external.library.ExternalFunctionDescriptorProvider;
+import edu.uci.ics.asterix.formats.base.IDataFormat;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.comparisons.ComparisonEvalFactory;
+import edu.uci.ics.asterix.runtime.formats.FormatUtils;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ILogicalExpressionJobGen;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions.ComparisonKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
+
+public class AqlLogicalExpressionJobGen implements ILogicalExpressionJobGen {
+
+ public static final AqlLogicalExpressionJobGen INSTANCE = new AqlLogicalExpressionJobGen();
+
+ private AqlLogicalExpressionJobGen() {
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
+ IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+ throws AlgebricksException {
+ ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
+ IFunctionDescriptor fd = getFunctionDescriptor(expr, env, context);
+ switch (fd.getFunctionDescriptorTag()) {
+ case SERIALAGGREGATE:
+ return null;
+ case AGGREGATE:
+ return fd.createAggregateFunctionFactory(args);
+ default:
+ throw new IllegalStateException("Invalid function descriptor " + fd.getFunctionDescriptorTag()
+ + " expected " + FunctionDescriptorTag.SERIALAGGREGATE + " or "
+ + FunctionDescriptorTag.AGGREGATE);
+ }
+ }
+
+ @Override
+ public ICopyRunningAggregateFunctionFactory createRunningAggregateFunctionFactory(
+ StatefulFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
+ JobGenContext context) throws AlgebricksException {
+ ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
+ return getFunctionDescriptor(expr, env, context).createRunningAggregateFunctionFactory(args);
+ }
+
+ @Override
+ public ICopyUnnestingFunctionFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
+ IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+ throws AlgebricksException {
+ ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
+ return getFunctionDescriptor(expr, env, context).createUnnestingFunctionFactory(args);
+ }
+
+ @Override
+ public ICopyEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
+ IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
+ ICopyEvaluatorFactory copyEvaluatorFactory = null;
+ switch (expr.getExpressionTag()) {
+ case VARIABLE: {
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ copyEvaluatorFactory = createVariableEvaluatorFactory(v, inputSchemas, context);
+ return copyEvaluatorFactory;
+ }
+ case CONSTANT: {
+ ConstantExpression c = (ConstantExpression) expr;
+ copyEvaluatorFactory = createConstantEvaluatorFactory(c, inputSchemas, context);
+ return copyEvaluatorFactory;
+ }
+ case FUNCTION_CALL: {
+ copyEvaluatorFactory = createScalarFunctionEvaluatorFactory((AbstractFunctionCallExpression) expr, env,
+ inputSchemas, context);
+ return copyEvaluatorFactory;
+ }
+ default:
+ throw new IllegalStateException();
+ }
+
+ }
+
+ private ICopyEvaluatorFactory createVariableEvaluatorFactory(VariableReferenceExpression expr,
+ IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
+ LogicalVariable variable = expr.getVariableReference();
+ for (IOperatorSchema scm : inputSchemas) {
+ int pos = scm.findVariable(variable);
+ if (pos >= 0) {
+ return new ColumnAccessEvalFactory(pos);
+ }
+ }
+ throw new AlgebricksException("Variable " + variable + " could not be found in any input schema.");
+ }
+
+ private ICopyEvaluatorFactory createScalarFunctionEvaluatorFactory(AbstractFunctionCallExpression expr,
+ IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
+ throws AlgebricksException {
+ ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
+ FunctionIdentifier fi = expr.getFunctionIdentifier();
+ ComparisonKind ck = AlgebricksBuiltinFunctions.getComparisonType(fi);
+ if (ck != null) {
+ return new ComparisonEvalFactory(args[0], args[1], ck);
+ }
+
+ IFunctionDescriptor fd = null;
+ if (!(expr.getFunctionInfo() instanceof IExternalFunctionInfo)) {
+ AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+ IDataFormat format = FormatUtils.getDefaultFormat();
+ fd = format.resolveFunction(expr, env);
+ } else {
+ try {
+ fd = ExternalFunctionDescriptorProvider.getExternalFunctionDescriptor((IExternalFunctionInfo) expr
+ .getFunctionInfo());
+ } catch (AsterixException ae) {
+ throw new AlgebricksException(ae);
+ }
+ }
+ return fd.createEvaluatorFactory(args);
+ }
+
+ private ICopyEvaluatorFactory createConstantEvaluatorFactory(ConstantExpression expr,
+ IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
+ AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+ IDataFormat format = FormatUtils.getDefaultFormat();
+ return format.getConstantEvalFactory(expr.getValue());
+ }
+
+ private ICopyEvaluatorFactory[] codegenArguments(AbstractFunctionCallExpression expr, IVariableTypeEnvironment env,
+ IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
+ List<Mutable<ILogicalExpression>> arguments = expr.getArguments();
+ int n = arguments.size();
+ ICopyEvaluatorFactory[] args = new ICopyEvaluatorFactory[n];
+ int i = 0;
+ for (Mutable<ILogicalExpression> a : arguments) {
+ args[i++] = createEvaluatorFactory(a.getValue(), env, inputSchemas, context);
+ }
+ return args;
+ }
+
+ @Override
+ public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
+ JobGenContext context) throws AlgebricksException {
+ ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
+ IFunctionDescriptor fd = getFunctionDescriptor(expr, env, context);
+
+ switch (fd.getFunctionDescriptorTag()) {
+ case AGGREGATE: {
+ if (AsterixBuiltinFunctions.isAggregateFunctionSerializable(fd.getIdentifier())) {
+ AggregateFunctionCallExpression serialAggExpr = AsterixBuiltinFunctions
+ .makeSerializableAggregateFunctionExpression(fd.getIdentifier(), expr.getArguments());
+ IFunctionDescriptor afdd = getFunctionDescriptor(serialAggExpr, env, context);
+ return afdd.createSerializableAggregateFunctionFactory(args);
+ } else {
+ throw new AlgebricksException(
+ "Trying to create a serializable aggregate from a non-serializable aggregate function descriptor. (fi="
+ + expr.getFunctionIdentifier() + ")");
+ }
+ }
+ case SERIALAGGREGATE: {
+ return fd.createSerializableAggregateFunctionFactory(args);
+ }
+
+ default:
+ throw new IllegalStateException("Invalid function descriptor " + fd.getFunctionDescriptorTag()
+ + " expected " + FunctionDescriptorTag.SERIALAGGREGATE + " or "
+ + FunctionDescriptorTag.AGGREGATE);
+ }
+ }
+
+ private IFunctionDescriptor getFunctionDescriptor(AbstractFunctionCallExpression expr,
+ IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
+ IFunctionDescriptor fd;
+ AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+ fd = FormatUtils.getDefaultFormat().resolveFunction(expr, env);
+ return fd;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
new file mode 100644
index 0000000..2d3ca0a
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.base;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractDataSourceOperator;
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+
+public class AnalysisUtil {
+ /*
+ * If the first child of op is of type opType, then it returns that child,
+ * o/w returns null.
+ */
+ public final static ILogicalOperator firstChildOfType(AbstractLogicalOperator op, LogicalOperatorTag opType) {
+ List<Mutable<ILogicalOperator>> ins = op.getInputs();
+ if (ins == null || ins.isEmpty()) {
+ return null;
+ }
+ Mutable<ILogicalOperator> opRef2 = ins.get(0);
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
+ if (op2.getOperatorTag() == opType) {
+ return op2;
+ } else {
+ return null;
+ }
+ }
+
+ public static int numberOfVarsInExpr(ILogicalExpression e) {
+ switch (((AbstractLogicalExpression) e).getExpressionTag()) {
+ case CONSTANT: {
+ return 0;
+ }
+ case FUNCTION_CALL: {
+ AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) e;
+ int s = 0;
+ for (Mutable<ILogicalExpression> arg : f.getArguments()) {
+ s += numberOfVarsInExpr(arg.getValue());
+ }
+ return s;
+ }
+ case VARIABLE: {
+ return 1;
+ }
+ default: {
+ assert false;
+ throw new IllegalArgumentException();
+ }
+ }
+ }
+
+ public static boolean isRunnableFieldAccessFunction(FunctionIdentifier fid) {
+ return fieldAccessFunctions.contains(fid);
+ }
+
+ public static boolean isDataSetCall(ILogicalExpression e) {
+ if (((AbstractLogicalExpression) e).getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return false;
+ }
+ AbstractFunctionCallExpression fe = (AbstractFunctionCallExpression) e;
+ return AsterixBuiltinFunctions.isDatasetFunction(fe.getFunctionIdentifier());
+ }
+
+ public static boolean isRunnableAccessToFieldRecord(ILogicalExpression expr) {
+ if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+ AbstractFunctionCallExpression fc = (AbstractFunctionCallExpression) expr;
+ FunctionIdentifier fid = fc.getFunctionIdentifier();
+ if (AnalysisUtil.isRunnableFieldAccessFunction(fid)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static boolean isAccessByNameToFieldRecord(ILogicalExpression expr) {
+ if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+ AbstractFunctionCallExpression fc = (AbstractFunctionCallExpression) expr;
+ FunctionIdentifier fid = fc.getFunctionIdentifier();
+ if (fid.equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static boolean isAccessToFieldRecord(ILogicalExpression expr) {
+ if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+ AbstractFunctionCallExpression fc = (AbstractFunctionCallExpression) expr;
+ FunctionIdentifier fid = fc.getFunctionIdentifier();
+ if (fid.equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX)
+ || fid.equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME)
+ || fid.equals(AsterixBuiltinFunctions.FIELD_ACCESS_NESTED)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static Pair<String, String> getDatasetInfo(AbstractDataSourceOperator op) throws AlgebricksException {
+ AqlSourceId srcId = (AqlSourceId) op.getDataSource().getId();
+ return new Pair<String, String>(srcId.getDataverseName(), srcId.getDatasourceName());
+ }
+
+ private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<FunctionIdentifier>();
+ static {
+ fieldAccessFunctions.add(AsterixBuiltinFunctions.GET_DATA);
+ fieldAccessFunctions.add(AsterixBuiltinFunctions.GET_HANDLE);
+ fieldAccessFunctions.add(AsterixBuiltinFunctions.TYPE_OF);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
new file mode 100644
index 0000000..063c887
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.base;
+
+import java.util.ArrayList;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.om.base.AFloat;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class FuzzyUtils {
+
+ private final static String DEFAULT_SIM_FUNCTION = "jaccard";
+ private final static float JACCARD_DEFAULT_SIM_THRESHOLD = .8f;
+ private final static int EDIT_DISTANCE_DEFAULT_SIM_THRESHOLD = 1;
+
+ private final static String SIM_FUNCTION_PROP_NAME = "simfunction";
+ private final static String SIM_THRESHOLD_PROP_NAME = "simthreshold";
+
+ public final static String JACCARD_FUNCTION_NAME = "jaccard";
+ public final static String EDIT_DISTANCE_FUNCTION_NAME = "edit-distance";
+
+ public static FunctionIdentifier getTokenizer(ATypeTag inputTag) {
+ switch (inputTag) {
+ case STRING:
+ return AsterixBuiltinFunctions.COUNTHASHED_WORD_TOKENS;
+ case UNORDEREDLIST:
+ case ORDEREDLIST:
+ case ANY:
+ return null;
+ default:
+ throw new NotImplementedException("No tokenizer for type " + inputTag);
+ }
+ }
+
+ public static IAObject getSimThreshold(AqlMetadataProvider metadata, String simFuncName) {
+ String simThresholValue = metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME);
+ IAObject ret = null;
+ if (simFuncName.equals(JACCARD_FUNCTION_NAME)) {
+ if (simThresholValue != null) {
+ float jaccThresh = Float.parseFloat(simThresholValue);
+ ret = new AFloat(jaccThresh);
+ } else {
+ ret = new AFloat(JACCARD_DEFAULT_SIM_THRESHOLD);
+ }
+ } else if (simFuncName.equals(EDIT_DISTANCE_FUNCTION_NAME)) {
+ if (simThresholValue != null) {
+ int edThresh = Integer.parseInt(simThresholValue);
+ ret = new AInt32(edThresh);
+ } else {
+ ret = new AFloat(EDIT_DISTANCE_DEFAULT_SIM_THRESHOLD);
+ }
+ }
+ return ret;
+ }
+
+ public static FunctionIdentifier getFunctionIdentifier(String simFuncName) {
+ if (simFuncName.equals(JACCARD_FUNCTION_NAME)) {
+ return AsterixBuiltinFunctions.SIMILARITY_JACCARD;
+ } else if (simFuncName.equals(EDIT_DISTANCE_FUNCTION_NAME)) {
+ return AsterixBuiltinFunctions.EDIT_DISTANCE;
+ }
+ return null;
+ }
+
+ public static ScalarFunctionCallExpression getComparisonExpr(String simFuncName,
+ ArrayList<Mutable<ILogicalExpression>> cmpArgs) {
+ if (simFuncName.equals(JACCARD_FUNCTION_NAME)) {
+ return new ScalarFunctionCallExpression(FunctionUtils.getFunctionInfo(AlgebricksBuiltinFunctions.GE),
+ cmpArgs);
+ } else if (simFuncName.equals(EDIT_DISTANCE_FUNCTION_NAME)) {
+ return new ScalarFunctionCallExpression(FunctionUtils.getFunctionInfo(AlgebricksBuiltinFunctions.LE),
+ cmpArgs);
+ }
+ return null;
+ }
+
+ public static float getSimThreshold(AqlMetadataProvider metadata) {
+ float simThreshold = JACCARD_DEFAULT_SIM_THRESHOLD;
+ String simThresholValue = metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME);
+ if (simThresholValue != null) {
+ simThreshold = Float.parseFloat(simThresholValue);
+ }
+ return simThreshold;
+ }
+
+ // TODO: The default function depend on the input types.
+ public static String getSimFunction(AqlMetadataProvider metadata) {
+ String simFunction = metadata.getPropertyValue(SIM_FUNCTION_PROP_NAME);
+ if (simFunction == null) {
+ simFunction = DEFAULT_SIM_FUNCTION;
+ }
+ simFunction = simFunction.toLowerCase();
+ return simFunction;
+ }
+}