You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2016/10/16 18:08:28 UTC
[3/4] asterixdb git commit: Enhanced Insert AQL
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index 4a79387..b1f646a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -36,7 +36,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -187,7 +187,7 @@ public class SweepIllegalNonfunctionalFunctions extends AbstractExtractExprRule
}
@Override
- public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
return null;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 17dec7c..c033214 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -23,7 +23,7 @@ import java.util.List;
import org.apache.asterix.algebra.extension.IAlgebraExtensionManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index 16ac80d..ec29b53 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.asterix.om.util.ConstantExpressionUtil;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.dataflow.data.common.AqlExpressionTypeComputer;
import org.apache.asterix.metadata.api.IMetadataEntity;
@@ -39,6 +38,7 @@ import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.om.util.ConstantExpressionUtil;
import org.apache.asterix.optimizer.rules.am.OptimizableOperatorSubTree.DataSourceType;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -72,15 +72,15 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
private AqlMetadataProvider metadataProvider;
// Function Identifier sets that retain the original field variable through each function's arguments
- private final ImmutableSet<FunctionIdentifier> funcIDSetThatRetainFieldName =
- ImmutableSet.of(AsterixBuiltinFunctions.WORD_TOKENS, AsterixBuiltinFunctions.GRAM_TOKENS,
- AsterixBuiltinFunctions.SUBSTRING, AsterixBuiltinFunctions.SUBSTRING_BEFORE,
- AsterixBuiltinFunctions.SUBSTRING_AFTER, AsterixBuiltinFunctions.CREATE_POLYGON,
- AsterixBuiltinFunctions.CREATE_MBR, AsterixBuiltinFunctions.CREATE_RECTANGLE,
- AsterixBuiltinFunctions.CREATE_CIRCLE, AsterixBuiltinFunctions.CREATE_LINE,
- AsterixBuiltinFunctions.CREATE_POINT, AsterixBuiltinFunctions.NUMERIC_ADD,
- AsterixBuiltinFunctions.NUMERIC_SUBTRACT, AsterixBuiltinFunctions.NUMERIC_MULTIPLY,
- AsterixBuiltinFunctions.NUMERIC_DIVIDE, AsterixBuiltinFunctions.NUMERIC_MOD);
+ private final ImmutableSet<FunctionIdentifier> funcIDSetThatRetainFieldName = ImmutableSet.of(
+ AsterixBuiltinFunctions.WORD_TOKENS, AsterixBuiltinFunctions.GRAM_TOKENS, AsterixBuiltinFunctions.SUBSTRING,
+ AsterixBuiltinFunctions.SUBSTRING_BEFORE, AsterixBuiltinFunctions.SUBSTRING_AFTER,
+ AsterixBuiltinFunctions.CREATE_POLYGON, AsterixBuiltinFunctions.CREATE_MBR,
+ AsterixBuiltinFunctions.CREATE_RECTANGLE, AsterixBuiltinFunctions.CREATE_CIRCLE,
+ AsterixBuiltinFunctions.CREATE_LINE, AsterixBuiltinFunctions.CREATE_POINT,
+ AsterixBuiltinFunctions.NUMERIC_ADD, AsterixBuiltinFunctions.NUMERIC_SUBTRACT,
+ AsterixBuiltinFunctions.NUMERIC_MULTIPLY, AsterixBuiltinFunctions.NUMERIC_DIVIDE,
+ AsterixBuiltinFunctions.NUMERIC_MOD);
public abstract Map<FunctionIdentifier, List<IAccessMethod>> getAccessMethods();
@@ -108,7 +108,7 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
protected void fillSubTreeIndexExprs(OptimizableOperatorSubTree subTree,
Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs, IOptimizationContext context)
- throws AlgebricksException {
+ throws AlgebricksException {
Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator();
// Check applicability of indexes by access method type.
while (amIt.hasNext()) {
@@ -145,15 +145,15 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
return list.isEmpty() ? null : list.get(0);
}
- protected List<Pair<IAccessMethod, Index>>
- chooseAllIndex(Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) {
+ protected List<Pair<IAccessMethod, Index>> chooseAllIndex(
+ Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) {
List<Pair<IAccessMethod, Index>> result = new ArrayList<Pair<IAccessMethod, Index>>();
Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator();
while (amIt.hasNext()) {
Map.Entry<IAccessMethod, AccessMethodAnalysisContext> amEntry = amIt.next();
AccessMethodAnalysisContext analysisCtx = amEntry.getValue();
- Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexIt =
- analysisCtx.indexExprsAndVars.entrySet().iterator();
+ Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexIt = analysisCtx.indexExprsAndVars.entrySet()
+ .iterator();
while (indexIt.hasNext()) {
Map.Entry<Index, List<Pair<Integer, Integer>>> indexEntry = indexIt.next();
// To avoid a case where the chosen access method and a chosen
@@ -167,15 +167,14 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
// LENGTH_PARTITIONED_NGRAM_INVIX]
IAccessMethod chosenAccessMethod = amEntry.getKey();
Index chosenIndex = indexEntry.getKey();
- boolean isKeywordOrNgramIndexChosen =
- chosenIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
- || chosenIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX
- || chosenIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
- || chosenIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX;
-
- if ((chosenAccessMethod == BTreeAccessMethod.INSTANCE && chosenIndex.getIndexType() == IndexType.BTREE)
- || (chosenAccessMethod == RTreeAccessMethod.INSTANCE
- && chosenIndex.getIndexType() == IndexType.RTREE)
+ IndexType indexType = chosenIndex.getIndexType();
+ boolean isKeywordOrNgramIndexChosen = indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+ || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX
+ || indexType == IndexType.SINGLE_PARTITION_WORD_INVIX
+ || indexType == IndexType.SINGLE_PARTITION_NGRAM_INVIX;
+
+ if ((chosenAccessMethod == BTreeAccessMethod.INSTANCE && indexType == IndexType.BTREE)
+ || (chosenAccessMethod == RTreeAccessMethod.INSTANCE && indexType == IndexType.RTREE)
|| (chosenAccessMethod == InvertedIndexAccessMethod.INSTANCE && isKeywordOrNgramIndexChosen)) {
result.add(new Pair<IAccessMethod, Index>(chosenAccessMethod, chosenIndex));
}
@@ -196,8 +195,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
*/
public void pruneIndexCandidates(IAccessMethod accessMethod, AccessMethodAnalysisContext analysisCtx,
IOptimizationContext context, IVariableTypeEnvironment typeEnvironment) throws AlgebricksException {
- Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexExprAndVarIt =
- analysisCtx.indexExprsAndVars.entrySet().iterator();
+ Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexExprAndVarIt = analysisCtx.indexExprsAndVars
+ .entrySet().iterator();
// Used to keep track of matched expressions (added for prefix search)
int numMatchedKeys = 0;
ArrayList<Integer> matchedExpressions = new ArrayList<Integer>();
@@ -226,24 +225,22 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
}
boolean typeMatch = true;
//Prune indexes based on field types
- List<IAType> indexedTypes = new ArrayList<IAType>();
+ List<IAType> matchedTypes = new ArrayList<>();
//retrieve types of expressions joined/selected with an indexed field
for (int j = 0; j < optFuncExpr.getNumLogicalVars(); j++) {
if (j != exprAndVarIdx.second) {
- indexedTypes.add(optFuncExpr.getFieldType(j));
+ matchedTypes.add(optFuncExpr.getFieldType(j));
}
}
- //add constants in case of select
- if (indexedTypes.size() < 2 && optFuncExpr.getNumLogicalVars() == 1
- && optFuncExpr.getNumConstantAtRuntimeExpr() > 0) {
- indexedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
+ if (matchedTypes.size() < 2 && optFuncExpr.getNumLogicalVars() == 1) {
+ matchedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
optFuncExpr.getConstantAtRuntimeExpr(0), context.getMetadataProvider(),
typeEnvironment));
}
//infer type of logicalExpr based on index keyType
- indexedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
+ matchedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
optFuncExpr.getLogicalExpr(exprAndVarIdx.second), null, new IVariableTypeEnvironment() {
@Override
@@ -257,7 +254,7 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
@Override
public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables,
List<List<LogicalVariable>> correlatedNullableVariableLists)
- throws AlgebricksException {
+ throws AlgebricksException {
if (var.equals(optFuncExpr.getSourceVar(exprAndVarIdx.second))) {
return keyType;
}
@@ -285,16 +282,16 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
boolean jaccardSimilarity = optFuncExpr.getFuncExpr().getFunctionIdentifier().getName()
.startsWith("similarity-jaccard-check");
- for (int j = 0; j < indexedTypes.size(); j++) {
- for (int k = j + 1; k < indexedTypes.size(); k++) {
- typeMatch &= isMatched(indexedTypes.get(j), indexedTypes.get(k), jaccardSimilarity);
+ for (int j = 0; j < matchedTypes.size(); j++) {
+ for (int k = j + 1; k < matchedTypes.size(); k++) {
+ typeMatch &= isMatched(matchedTypes.get(j), matchedTypes.get(k), jaccardSimilarity);
}
}
// Check if any field name in the optFuncExpr matches.
if (optFuncExpr.findFieldName(keyField) != -1) {
- foundKeyField =
- typeMatch && optFuncExpr.getOperatorSubTree(exprAndVarIdx.second).hasDataSourceScan();
+ foundKeyField = typeMatch
+ && optFuncExpr.getOperatorSubTree(exprAndVarIdx.second).hasDataSourceScan();
if (foundKeyField) {
matchedExpressions.add(exprAndVarIdx.first);
numMatchedKeys++;
@@ -369,8 +366,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
continue;
}
AbstractFunctionCallExpression argFuncExpr = (AbstractFunctionCallExpression) argExpr;
- boolean matchFound =
- analyzeFunctionExpr(argFuncExpr, assignsAndUnnests, analyzedAMs, context, typeEnvironment);
+ boolean matchFound = analyzeFunctionExpr(argFuncExpr, assignsAndUnnests, analyzedAMs, context,
+ typeEnvironment);
found = found || matchFound;
}
return found;
@@ -435,14 +432,13 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
protected boolean fillIndexExprs(List<Index> datasetIndexes, List<String> fieldName, IAType fieldType,
IOptimizableFuncExpr optFuncExpr, int matchedFuncExprIndex, int varIdx,
OptimizableOperatorSubTree matchedSubTree, AccessMethodAnalysisContext analysisCtx)
- throws AlgebricksException {
+ throws AlgebricksException {
List<Index> indexCandidates = new ArrayList<Index>();
// Add an index to the candidates if one of the indexed fields is
// fieldName
for (Index index : datasetIndexes) {
// Need to also verify the index is pending no op
- if (index.getKeyFieldNames().contains(fieldName)
- && index.getPendingOp() == IMetadataEntity.PENDING_NO_OP) {
+ if (index.getKeyFieldNames().contains(fieldName) && index.getPendingOp() == IMetadataEntity.PENDING_NO_OP) {
indexCandidates.add(index);
if (optFuncExpr.getFieldType(varIdx) == BuiltinType.AMISSING
|| optFuncExpr.getFieldType(varIdx) == BuiltinType.ANY) {
@@ -540,8 +536,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
return;
}
}
- IAType fieldType =
- (IAType) context.getOutputTypeEnvironment(unnestOp).getType(optFuncExpr.getLogicalExpr(funcVarIndex));
+ IAType fieldType = (IAType) context.getOutputTypeEnvironment(unnestOp)
+ .getType(optFuncExpr.getLogicalExpr(funcVarIndex));
// Set the fieldName in the corresponding matched function
// expression.
optFuncExpr.setFieldName(funcVarIndex, fieldName);
@@ -571,16 +567,14 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
// Remember matching subtree.
optFuncExpr.setOptimizableSubTree(optVarIndex, subTree);
- List<String> fieldName =
- getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex,
- subTree.getRecordType(), optVarIndex,
- optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue(),
- datasetRecordVar, subTree.getMetaRecordType(), datasetMetaVar);
+ List<String> fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex,
+ subTree.getRecordType(), optVarIndex,
+ optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue(), datasetRecordVar,
+ subTree.getMetaRecordType(), datasetMetaVar);
if (fieldName == null) {
continue;
}
- IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp)
- .getType(optFuncExpr.getLogicalExpr(optVarIndex));
+ IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp).getVarType(var);
// Set the fieldName in the corresponding matched
// function expression.
optFuncExpr.setFieldName(optVarIndex, fieldName);
@@ -597,7 +591,7 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
private void matchVarsFromOptFuncExprToDataSourceScan(IOptimizableFuncExpr optFuncExpr, int optFuncExprIndex,
List<Index> datasetIndexes, List<LogicalVariable> dsVarList, OptimizableOperatorSubTree subTree,
AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean fromAdditionalDataSource)
- throws AlgebricksException {
+ throws AlgebricksException {
for (int varIndex = 0; varIndex < dsVarList.size(); varIndex++) {
LogicalVariable var = dsVarList.get(varIndex);
int funcVarIndex = optFuncExpr.findLogicalVar(var);
@@ -615,16 +609,15 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
// Check whether this variable is PK, not a record variable.
if (varIndex <= subTreePKs.size() - 1) {
fieldName = subTreePKs.get(varIndex);
- fieldType =
- (IAType) context.getOutputTypeEnvironment(
- subTree.getDataSourceRef().getValue()).getVarType(var);
+ fieldType = (IAType) context.getOutputTypeEnvironment(subTree.getDataSourceRef().getValue())
+ .getVarType(var);
}
} else {
// Need to check additional dataset one by one
for (int i = 0; i < subTree.getIxJoinOuterAdditionalDatasets().size(); i++) {
if (subTree.getIxJoinOuterAdditionalDatasets().get(i) != null) {
- subTreePKs = DatasetUtils.getPartitioningKeys(
- subTree.getIxJoinOuterAdditionalDatasets().get(i));
+ subTreePKs = DatasetUtils
+ .getPartitioningKeys(subTree.getIxJoinOuterAdditionalDatasets().get(i));
// Check whether this variable is PK, not a record variable.
if (subTreePKs.contains(var) && varIndex <= subTreePKs.size() - 1) {
@@ -667,11 +660,10 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
*
* @throws AlgebricksException
*/
- protected List<String> getFieldNameFromSubTree(IOptimizableFuncExpr optFuncExpr,
- OptimizableOperatorSubTree subTree, int opIndex, int assignVarIndex, ARecordType recordType,
- int funcVarIndex, ILogicalExpression parentFuncExpr, LogicalVariable recordVar,
- ARecordType metaType, LogicalVariable metaVar)
- throws AlgebricksException {
+ protected List<String> getFieldNameFromSubTree(IOptimizableFuncExpr optFuncExpr, OptimizableOperatorSubTree subTree,
+ int opIndex, int assignVarIndex, ARecordType recordType, int funcVarIndex,
+ ILogicalExpression parentFuncExpr, LogicalVariable recordVar, ARecordType metaType, LogicalVariable metaVar)
+ throws AlgebricksException {
// Get expression corresponding to opVar at varIndex.
AbstractLogicalExpression expr = null;
AbstractFunctionCallExpression childFuncExpr = null;
@@ -679,6 +671,10 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
AssignOperator assignOp = (AssignOperator) op;
expr = (AbstractLogicalExpression) assignOp.getExpressions().get(assignVarIndex).getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ //Otherwise the cast for childFuncExpr would fail
+ return null;
+ }
childFuncExpr = (AbstractFunctionCallExpression) expr;
} else {
UnnestOperator unnestOp = (UnnestOperator) op;
@@ -723,8 +719,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
return null;
}
ConstantExpression constExpr = (ConstantExpression) nameArg;
- AOrderedList orderedNestedFieldName =
- (AOrderedList) ((AsterixConstantValue) constExpr.getValue()).getObject();
+ AOrderedList orderedNestedFieldName = (AOrderedList) ((AsterixConstantValue) constExpr.getValue())
+ .getObject();
nestedAccessFieldName = new ArrayList<String>();
for (int i = 0; i < orderedNestedFieldName.size(); i++) {
nestedAccessFieldName.add(((AString) orderedNestedFieldName.getItem(i)).getStringValue());
@@ -733,8 +729,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
isByName = true;
}
if (isFieldAccess) {
- LogicalVariable sourceVar =
- ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue()).getVariableReference();
+ LogicalVariable sourceVar = ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue())
+ .getVariableReference();
optFuncExpr.setLogicalExpr(funcVarIndex, parentFuncExpr);
int[] assignAndExpressionIndexes = null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index 874cc7c..23e45c4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -56,7 +56,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -490,7 +490,7 @@ class InlineAllNtsInSubplanVisitor implements IQueryOperatorVisitor<ILogicalOper
}
@Override
- public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
index eeb2c2a..d3a0c0f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
@@ -41,7 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -255,7 +255,7 @@ class InlineLeftNtsInSubplanJoinFlatteningVisitor implements IQueryOperatorVisit
}
@Override
- public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
index ccf0aeb..44bfbe4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
@@ -27,7 +27,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -143,7 +143,7 @@ class SubplanSpecialFlatteningCheckVisitor implements IQueryOperatorVisitor<Bool
}
@Override
- public Boolean visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ public Boolean visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
return false;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index b184774..98c717c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -295,12 +295,17 @@ public class CompiledStatements {
private final String datasetName;
private final Query query;
private final int varCounter;
+ VariableExpr var;
+ Query returnQuery;
- public CompiledInsertStatement(String dataverseName, String datasetName, Query query, int varCounter) {
+ public CompiledInsertStatement(String dataverseName, String datasetName, Query query, int varCounter,
+ VariableExpr var, Query returnQuery) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.query = query;
this.varCounter = varCounter;
+ this.var = var;
+ this.returnQuery = returnQuery;
}
@Override
@@ -321,6 +326,14 @@ public class CompiledStatements {
return query;
}
+ public VariableExpr getVar() {
+ return var;
+ }
+
+ public Query getReturnQuery() {
+ return returnQuery;
+ }
+
@Override
public byte getKind() {
return Statement.Kind.INSERT;
@@ -329,8 +342,9 @@ public class CompiledStatements {
public static class CompiledUpsertStatement extends CompiledInsertStatement {
- public CompiledUpsertStatement(String dataverseName, String datasetName, Query query, int varCounter) {
- super(dataverseName, datasetName, query, varCounter);
+ public CompiledUpsertStatement(String dataverseName, String datasetName, Query query, int varCounter,
+ VariableExpr var, Query returnQuery) {
+ super(dataverseName, datasetName, query, varCounter, var, returnQuery);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 1b528b9..149656a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -116,6 +116,8 @@ public interface IStatementExecutor {
* @param dmlStatement
* The data modification statement when the query results in a modification to a dataset
* @return the compiled {@code JobSpecification}
+ * @param returnQuery
+ * In the case of dml, the user may run a query on affected data
* @throws AsterixException
* @throws RemoteException
* @throws AlgebricksException
@@ -124,7 +126,7 @@ public interface IStatementExecutor {
*/
JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
ICompiledDmlStatement dmlStatement)
- throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException;
+ throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException;
/**
* returns the active dataverse for an entity or a statement
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 09a0476..9879da8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -27,11 +27,12 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
+import org.apache.asterix.algebra.operators.CommitOperator;
import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -40,15 +41,15 @@ import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.lang.aql.util.RangeMapBuilder;
import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.Expression.Kind;
import org.apache.asterix.lang.common.base.ILangExpression;
import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.base.Expression.Kind;
import org.apache.asterix.lang.common.clause.GroupbyClause;
import org.apache.asterix.lang.common.clause.LetClause;
import org.apache.asterix.lang.common.clause.LimitClause;
import org.apache.asterix.lang.common.clause.OrderbyClause;
-import org.apache.asterix.lang.common.clause.WhereClause;
import org.apache.asterix.lang.common.clause.OrderbyClause.OrderModifier;
+import org.apache.asterix.lang.common.clause.WhereClause;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.FieldAccessor;
import org.apache.asterix.lang.common.expression.FieldBinding;
@@ -56,14 +57,14 @@ import org.apache.asterix.lang.common.expression.GbyVariableExpressionPair;
import org.apache.asterix.lang.common.expression.IfExpr;
import org.apache.asterix.lang.common.expression.IndexAccessor;
import org.apache.asterix.lang.common.expression.ListConstructor;
+import org.apache.asterix.lang.common.expression.ListConstructor.Type;
import org.apache.asterix.lang.common.expression.LiteralExpr;
import org.apache.asterix.lang.common.expression.OperatorExpr;
import org.apache.asterix.lang.common.expression.QuantifiedExpression;
+import org.apache.asterix.lang.common.expression.QuantifiedExpression.Quantifier;
import org.apache.asterix.lang.common.expression.RecordConstructor;
import org.apache.asterix.lang.common.expression.UnaryExpr;
import org.apache.asterix.lang.common.expression.VariableExpr;
-import org.apache.asterix.lang.common.expression.ListConstructor.Type;
-import org.apache.asterix.lang.common.expression.QuantifiedExpression.Quantifier;
import org.apache.asterix.lang.common.literal.StringLiteral;
import org.apache.asterix.lang.common.statement.FunctionDecl;
import org.apache.asterix.lang.common.statement.Query;
@@ -74,13 +75,13 @@ import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.lang.common.visitor.base.AbstractQueryExpressionVisitor;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.declared.AqlSourceId;
import org.apache.asterix.metadata.declared.DatasetDataSource;
import org.apache.asterix.metadata.declared.LoadableDataSource;
import org.apache.asterix.metadata.declared.ResultSetDataSink;
import org.apache.asterix.metadata.declared.ResultSetSinkId;
-import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.Function;
@@ -96,8 +97,10 @@ import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.util.FunctionCollection;
import org.apache.asterix.translator.util.PlanTranslationUtil;
@@ -116,15 +119,15 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation;
+import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation.BroadcastSide;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
-import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation.BroadcastSide;
import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
@@ -133,6 +136,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOper
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
@@ -140,13 +144,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -286,15 +290,30 @@ class LangExpressionToPlanTranslator
@Override
public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt)
throws AlgebricksException {
- Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this,
- new MutableObject<>(new EmptyTupleSourceOperator()));
+ return translate(expr, outputDatasetName, stmt, null);
+ }
+
+ public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
+ ILogicalOperator baseOp) throws AlgebricksException {
+ MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
+ if (baseOp != null) {
+ base = new MutableObject<>(baseOp);
+ }
+ Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this, base);
ArrayList<Mutable<ILogicalOperator>> globalPlanRoots = new ArrayList<>();
ILogicalOperator topOp = p.first;
- ProjectOperator project = (ProjectOperator) topOp;
- LogicalVariable unnestVar = project.getVariables().get(0);
- LogicalVariable resVar = project.getVariables().get(0);
if (outputDatasetName == null) {
+ LogicalVariable resVar;
+ if (topOp instanceof ProjectOperator) {
+ resVar = ((ProjectOperator) topOp).getVariables().get(0);
+ } else if (topOp instanceof AssignOperator) {
+ resVar = ((AssignOperator) topOp).getVariables().get(0);
+ } else if (topOp instanceof AggregateOperator) {
+ resVar = ((AggregateOperator) topOp).getVariables().get(0);
+ } else {
+ throw new AlgebricksException("Invalid returning query");
+ }
FileSplit outputFileSplit = metadataProvider.getOutputFile();
if (outputFileSplit == null) {
outputFileSplit = getDefaultOutputFileLocation();
@@ -305,8 +324,9 @@ class LangExpressionToPlanTranslator
writeExprList.add(new MutableObject<>(new VariableReferenceExpression(resVar)));
ResultSetSinkId rssId = new ResultSetSinkId(metadataProvider.getResultSetId());
ResultSetDataSink sink = new ResultSetDataSink(rssId, null);
- topOp = new DistributeResultOperator(writeExprList, sink);
- topOp.getInputs().add(new MutableObject<>(project));
+ DistributeResultOperator newTop = new DistributeResultOperator(writeExprList, sink);
+ newTop.getInputs().add(new MutableObject<>(topOp));
+ topOp = newTop;
// Retrieve the Output RecordType (if any) and store it on
// the DistributeResultOperator
@@ -315,6 +335,10 @@ class LangExpressionToPlanTranslator
topOp.getAnnotations().put("output-record-type", outputRecordType);
}
} else {
+ ProjectOperator project = (ProjectOperator) topOp;
+ LogicalVariable unnestVar = project.getVariables().get(0);
+ LogicalVariable resVar = project.getVariables().get(0);
+
/**
* add the collection-to-sequence right before the project,
* because dataset only accept non-collection records
@@ -380,12 +404,12 @@ class LangExpressionToPlanTranslator
switch (stmt.getKind()) {
case Statement.Kind.INSERT:
leafOperator = translateInsert(targetDatasource, varRef, varRefsForLoading,
- additionalFilteringExpressions, assign);
+ additionalFilteringExpressions, assign, stmt);
break;
case Statement.Kind.UPSERT:
leafOperator = translateUpsert(targetDatasource, varRef, varRefsForLoading,
additionalFilteringExpressions, assign, additionalFilteringField, unnestVar, project, exprs,
- resVar, additionalFilteringAssign);
+ resVar, additionalFilteringAssign, stmt);
break;
case Statement.Kind.DELETE:
leafOperator = translateDelete(targetDatasource, varRef, varRefsForLoading,
@@ -418,7 +442,7 @@ class LangExpressionToPlanTranslator
varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
insertOp.getInputs().add(new MutableObject<>(assign));
- SinkOperator leafOperator = new SinkOperator();
+ ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
leafOperator.getInputs().add(new MutableObject<>(insertOp));
return leafOperator;
}
@@ -426,7 +450,7 @@ class LangExpressionToPlanTranslator
private ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
List<Mutable<ILogicalExpression>> varRefsForLoading,
List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign)
- throws AlgebricksException {
+ throws AlgebricksException {
if (targetDatasource.getDataset().hasMetaPart()) {
throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
+ ": delete from dataset is not supported on Datasets with Meta records");
@@ -435,7 +459,7 @@ class LangExpressionToPlanTranslator
varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false);
deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
deleteOp.getInputs().add(new MutableObject<>(assign));
- SinkOperator leafOperator = new SinkOperator();
+ ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
leafOperator.getInputs().add(new MutableObject<>(deleteOp));
return leafOperator;
}
@@ -528,7 +552,7 @@ class LangExpressionToPlanTranslator
project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
}
feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- SinkOperator leafOperator = new SinkOperator();
+ ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
return leafOperator;
}
@@ -537,14 +561,20 @@ class LangExpressionToPlanTranslator
List<Mutable<ILogicalExpression>> varRefsForLoading,
List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
List<String> additionalFilteringField, LogicalVariable unnestVar, ProjectOperator project,
- List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar, AssignOperator additionalFilteringAssign)
- throws AlgebricksException {
+ List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar, AssignOperator additionalFilteringAssign,
+ ICompiledDmlStatement stmt) throws AlgebricksException {
if (!targetDatasource.getDataset().allow(project, Dataset.OP_UPSERT)) {
throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
+ ": upsert into dataset is not supported on Datasets with Meta records");
}
+ CompiledUpsertStatement compiledUpsert = (CompiledUpsertStatement) stmt;
+ InsertDeleteUpsertOperator upsertOp;
+ ILogicalOperator leafOperator;
if (targetDatasource.getDataset().hasMetaPart()) {
- InsertDeleteUpsertOperator feedModificationOp;
+ if (compiledUpsert.getReturnQuery() != null) {
+ throw new AlgebricksException("Returning not allowed on datasets with Meta records");
+
+ }
AssignOperator metaAndKeysAssign;
List<LogicalVariable> metaAndKeysVars;
List<Mutable<ILogicalExpression>> metaAndKeysExprs;
@@ -575,71 +605,113 @@ class LangExpressionToPlanTranslator
}
}
// A change feed, we don't need the assign to access PKs
- feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
- metaExpSingletonList, InsertDeleteUpsertOperator.Kind.UPSERT, false);
+ upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, metaExpSingletonList,
+ InsertDeleteUpsertOperator.Kind.UPSERT, false);
// Create and add a new variable used for representing the original record
- feedModificationOp.setPrevRecordVar(context.newVar());
- feedModificationOp.setPrevRecordType(targetDatasource.getItemType());
+ upsertOp.setPrevRecordVar(context.newVar());
+ upsertOp.setPrevRecordType(targetDatasource.getItemType());
if (targetDatasource.getDataset().hasMetaPart()) {
List<LogicalVariable> metaVars = new ArrayList<>();
metaVars.add(context.newVar());
- feedModificationOp.setPrevAdditionalNonFilteringVars(metaVars);
+ upsertOp.setPrevAdditionalNonFilteringVars(metaVars);
List<Object> metaTypes = new ArrayList<>();
metaTypes.add(targetDatasource.getMetaItemType());
- feedModificationOp.setPrevAdditionalNonFilteringTypes(metaTypes);
+ upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
}
if (additionalFilteringField != null) {
- feedModificationOp.setPrevFilterVar(context.newVar());
- feedModificationOp.setPrevFilterType(
+ upsertOp.setPrevFilterVar(context.newVar());
+ upsertOp.setPrevFilterType(
((ARecordType) targetDatasource.getItemType()).getFieldType(additionalFilteringField.get(0)));
additionalFilteringAssign.getInputs().clear();
additionalFilteringAssign.getInputs().add(assign.getInputs().get(0));
- feedModificationOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
+ upsertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
} else {
- feedModificationOp.getInputs().add(assign.getInputs().get(0));
+ upsertOp.getInputs().add(assign.getInputs().get(0));
}
metaAndKeysAssign = new AssignOperator(metaAndKeysVars, metaAndKeysExprs);
metaAndKeysAssign.getInputs().add(project.getInputs().get(0));
project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
- feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- SinkOperator leafOperator = new SinkOperator();
- leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
- return leafOperator;
+ upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+ leafOperator = new DelegateOperator(new CommitOperator(true));
+ leafOperator.getInputs().add(new MutableObject<>(upsertOp));
+
} else {
- InsertDeleteUpsertOperator feedModificationOp;
- feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+ upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
InsertDeleteUpsertOperator.Kind.UPSERT, false);
- feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- feedModificationOp.getInputs().add(new MutableObject<>(assign));
+ upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+ upsertOp.getInputs().add(new MutableObject<>(assign));
// Create and add a new variable used for representing the original record
ARecordType recordType = (ARecordType) targetDatasource.getItemType();
- feedModificationOp.setPrevRecordVar(context.newVar());
- feedModificationOp.setPrevRecordType(recordType);
+ upsertOp.setPrevRecordVar(context.newVar());
+ upsertOp.setPrevRecordType(recordType);
if (additionalFilteringField != null) {
- feedModificationOp.setPrevFilterVar(context.newVar());
- feedModificationOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
+ upsertOp.setPrevFilterVar(context.newVar());
+ upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
+ }
+
+ if (compiledUpsert.getReturnQuery() != null) {
+ leafOperator = createReturningQuery(compiledUpsert, upsertOp);
+
+ } else {
+ leafOperator = new DelegateOperator(new CommitOperator(true));
+ leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(upsertOp));
}
- SinkOperator leafOperator = new SinkOperator();
- leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
- return leafOperator;
}
+ return leafOperator;
+
+ }
+
+ private ILogicalOperator createReturningQuery(CompiledInsertStatement compiledInsert,
+ InsertDeleteUpsertOperator insertOp) throws AlgebricksException {
+ //Make the id of the insert var point to the record variable
+ context.newVar(compiledInsert.getVar());
+ context.setVar(compiledInsert.getVar(),
+ ((VariableReferenceExpression) insertOp.getPayloadExpression().getValue()).getVariableReference());
+ // context
+
+ ILogicalPlan planAfterInsert = translate(compiledInsert.getReturnQuery(), null, null, insertOp);
+
+ ILogicalOperator finalRoot = planAfterInsert.getRoots().get(0).getValue();
+ ILogicalOperator op;
+ for (op = finalRoot;; op = op.getInputs().get(0).getValue()) {
+ if (op.getInputs().size() != 1) {
+ throw new AlgebricksException("Cannot have a multi-branch returning query");
+ }
+ if (op.getInputs().get(0).getValue() instanceof InsertDeleteUpsertOperator) {
+ break;
+ }
+ }
+
+ op.getInputs().clear();
+ ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(false));
+ leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
+ op.getInputs().add(new MutableObject<>(leafOperator));
+ leafOperator = finalRoot;
+ return leafOperator;
}
private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
List<Mutable<ILogicalExpression>> varRefsForLoading,
- List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign)
- throws AlgebricksException {
+ List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
+ ICompiledDmlStatement stmt) throws AlgebricksException {
if (targetDatasource.getDataset().hasMetaPart()) {
throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
+ ": insert into dataset is not supported on Datasets with Meta records");
}
+ ILogicalOperator leafOperator;
InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- insertOp.getInputs().add(new MutableObject<>(assign));
- SinkOperator leafOperator = new SinkOperator();
- leafOperator.getInputs().add(new MutableObject<>(insertOp));
+ insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+ CompiledInsertStatement compiledInsert = (CompiledInsertStatement) stmt;
+ if (compiledInsert.getReturnQuery() != null) {
+ leafOperator = createReturningQuery(compiledInsert, insertOp);
+
+ } else {
+ leafOperator = new DelegateOperator(new CommitOperator(true));
+ leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
+ }
return leafOperator;
}
@@ -880,15 +952,15 @@ class LangExpressionToPlanTranslator
gOp.getInputs().add(topOp);
for (Entry<Expression, VariableExpr> entry : gc.getWithVarMap().entrySet()) {
- Pair<ILogicalExpression, Mutable<ILogicalOperator>> listifyInput = langExprToAlgExpression(
- entry.getKey(), new MutableObject<>(new NestedTupleSourceOperator(new MutableObject<>(gOp))));
- List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>(1);
+ Pair<ILogicalExpression, Mutable<ILogicalOperator>> listifyInput = langExprToAlgExpression(entry.getKey(),
+ new MutableObject<>(new NestedTupleSourceOperator(new MutableObject<>(gOp))));
+ List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>(1);
flArgs.add(new MutableObject<>(listifyInput.first));
AggregateFunctionCallExpression fListify = AsterixBuiltinFunctions
- .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs);
+ .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs);
LogicalVariable aggVar = context.newVar();
AggregateOperator agg = new AggregateOperator(mkSingletonArrayList(aggVar),
- mkSingletonArrayList(new MutableObject<>(fListify)));
+ mkSingletonArrayList(new MutableObject<>(fListify)));
agg.getInputs().add(listifyInput.second);
@@ -945,8 +1017,8 @@ class LangExpressionToPlanTranslator
LogicalVariable unnestVar = context.newVar();
UnnestOperator unnestOp = new UnnestOperator(unnestVar,
new MutableObject<>(new UnnestingFunctionCallExpression(
- FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), Collections
- .singletonList(new MutableObject<>(new VariableReferenceExpression(selectVar))))));
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION),
+ Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(selectVar))))));
unnestOp.getInputs().add(new MutableObject<>(assignOp));
// Produces the final result.
@@ -1514,7 +1586,7 @@ class LangExpressionToPlanTranslator
// There is a shared operator reference in the query plan.
// Deep copies the child plan.
LogicalOperatorDeepCopyWithNewVariablesVisitor visitor =
- new LogicalOperatorDeepCopyWithNewVariablesVisitor(context, null);
+ new LogicalOperatorDeepCopyWithNewVariablesVisitor(context, null);
ILogicalOperator newChild = childRef.getValue().accept(visitor, null);
LinkedHashMap<LogicalVariable, LogicalVariable> cloneVarMap = visitor
.getInputToOutputVariableMapping();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
index 78c68e1..6c8019d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
@@ -25,21 +25,14 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
-import java.util.Collection;
import javax.imageio.ImageIO;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.watch.FeedActivity;
-import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
-
public class FeedServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
- private static final String FEED_EXTENSION_NAME = "Feed";
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
@@ -89,49 +82,4 @@ public class FeedServlet extends HttpServlet {
PrintWriter out = response.getWriter();
out.println(outStr);
}
-
- @SuppressWarnings("unused")
- private void insertTable(StringBuilder html, Collection<FeedActivity> list) {
- }
-
- @SuppressWarnings("null")
- private void insertRow(StringBuilder html, FeedActivity activity) {
- String intake = activity.getFeedActivityDetails().get(FeedActivityDetails.INTAKE_LOCATIONS);
- String compute = activity.getFeedActivityDetails().get(FeedActivityDetails.COMPUTE_LOCATIONS);
- String store = activity.getFeedActivityDetails().get(FeedActivityDetails.STORAGE_LOCATIONS);
-
- FeedConnectionId connectionId = new FeedConnectionId(
- new EntityId(FEED_EXTENSION_NAME, activity.getDataverseName(), activity.getFeedName()),
- activity.getDatasetName());
- int intakeRate = 0;
- int storeRate = 0;
-
- html.append("<tr>");
- html.append("<td>" + activity.getFeedName() + "</td>");
- html.append("<td>" + activity.getDatasetName() + "</td>");
- html.append("<td>" + activity.getConnectTimestamp() + "</td>");
- //html.append("<td>" + insertLink(html, FeedDashboardServlet.getParameterizedURL(activity), "Details") + "</td>");
- html.append("<td>" + intake + "</td>");
- html.append("<td>" + compute + "</td>");
- html.append("<td>" + store + "</td>");
- String color = "black";
- if (intakeRate > storeRate) {
- color = "red";
- }
- if (intakeRate < 0) {
- html.append("<td>" + "UNKNOWN" + "</td>");
- } else {
- html.append("<td>" + insertColoredText("" + intakeRate, color) + " rec/sec" + "</td>");
- }
- if (storeRate < 0) {
- html.append("<td>" + "UNKNOWN" + "</td>");
- } else {
- html.append("<td>" + insertColoredText("" + storeRate, color) + " rec/sec" + "</td>");
- }
- html.append("</tr>");
- }
-
- private String insertColoredText(String s, String color) {
- return "<font color=\"" + color + "\">" + s + "</font>";
- }
}