You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2018/06/26 22:34:08 UTC
asterixdb-bad git commit: Pull mostly works,
but stores the broker subs in the result (which we don't need)
Repository: asterixdb-bad
Updated Branches:
refs/heads/resultsFinalVersion [created] d7e09a96c
Pull mostly works, but stores the broker subs in the result (which we don't need)
Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/d7e09a96
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/d7e09a96
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/d7e09a96
Branch: refs/heads/resultsFinalVersion
Commit: d7e09a96cbfd02f3f982d13f42bc6a3a8b4ed6ce
Parents: ffc937a
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Tue Jun 26 15:33:18 2018 -0700
Committer: Steven Glenn Jacobs <sj...@ucr.edu>
Committed: Tue Jun 26 15:33:18 2018 -0700
----------------------------------------------------------------------
.../InsertBrokerNotifierForChannelRule.java | 211 +++++--------------
1 file changed, 52 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d7e09a96/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 1ef678d..6b91a8c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -28,7 +28,6 @@ import org.apache.asterix.bad.runtime.NotifyBrokerOperator;
import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.metadata.declared.DatasetDataSource;
-import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.BuiltinFunctions;
@@ -48,8 +47,8 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionC
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -61,7 +60,8 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -134,9 +134,6 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
channelName = datasetName.substring(0, datasetName.length() - 13);
}
- //get channelSubscriptionIdVar
- LogicalVariable channelSubscriptionIdVar = subscriptionsScan.getVariables().get(0);
-
//The channelExecutionTime is created just before the scan
ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
if (channelExecutionAssign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
@@ -147,52 +144,33 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
return false;
}
- if (!push) {
- ((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
- }
-
//move broker scan
- AbstractLogicalOperator opAboveBrokersScan = findOp(op, 1, "", "");
- if (opAboveBrokersScan == null) {
+ SubplanOperator subplanOperator = (SubplanOperator) findOp(op, 1, "", "");
+ if (subplanOperator == null) {
return false;
}
- DataSourceScanOperator brokerScan =
- moveScans(opAboveBrokersScan, op1, context, channelName + "BrokerSubscriptions");
- if (brokerScan == null) {
- return false;
- }
- DataSourceScanOperator brokerSubscriptionScan =
- (DataSourceScanOperator) brokerScan.getInputs().get(0).getValue();
-
- //Add select to join subscriptions and broker and assign to get endpoint
- LogicalVariable brokerDataverseVar = brokerScan.getVariables().get(0);
- LogicalVariable brokerNameVar = brokerScan.getVariables().get(1);
- LogicalVariable brokerVar = brokerScan.getVariables().get(2);
- LogicalVariable brokerSubscriptionVar = brokerSubscriptionScan.getVariables().get(2);
- LogicalVariable brokerSubscriptionIdVar = brokerSubscriptionScan.getVariables().get(1);
- LogicalVariable brokerSubscriptionChannelIdVar = brokerSubscriptionScan.getVariables().get(0);
-
- LogicalVariable brokerEndpointVar = context.newVar();
- AssignOperator assign = createAssignAndSelect(brokerDataverseVar, brokerNameVar, brokerVar,
- brokerSubscriptionVar, brokerSubscriptionIdVar, brokerEndpointVar, channelSubscriptionIdVar,
- brokerSubscriptionChannelIdVar, context, op1);
+ LogicalVariable brokerEndpoint = context.newVar();
+ LogicalVariable brokerSubId = context.newVar();
+ LogicalVariable brokerSubsVar =
+ ((AggregateOperator) subplanOperator.getNestedPlans().get(0).getRoots().get(0).getValue())
+ .getVariables().get(0);
+
+ AssignOperator newAssign = createAssignsAndUnnest(brokerSubsVar, brokerEndpoint, brokerSubId, op, context);
context.computeAndSetTypeEnvironmentForOperator(op1);
//Maybe we need to add a project???
ProjectOperator badProject = (ProjectOperator) findOp(op1, 2, "", "");
badProject.getVariables().add(channelExecutionVar);
- badProject.getVariables().add(channelSubscriptionIdVar);
+ badProject.getVariables().add(brokerSubsVar);
context.computeAndSetTypeEnvironmentForOperator(badProject);
-
//Create my brokerNotify plan above the extension Operator
DelegateOperator dOp = push
- ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
- context, assign, (DistributeResultOperator) op1, channelDataverse, channelName)
- : createNotifyBrokerPullPlan(brokerEndpointVar, brokerSubscriptionIdVar, channelExecutionVar, context,
- assign,
+ ? createNotifyBrokerPushPlan(brokerEndpoint, badProject.getVariables().get(0), channelExecutionVar,
+ context, newAssign, (DistributeResultOperator) op1, channelDataverse, channelName)
+ : createNotifyBrokerPullPlan(brokerEndpoint, brokerSubId, channelExecutionVar, context, newAssign,
(DistributeResultOperator) op1, channelDataverse, channelName);
opRef.setValue(dOp);
@@ -200,106 +178,52 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
return true;
}
- private AssignOperator createAssignAndSelect(LogicalVariable brokerDataverseVar, LogicalVariable brokerNameVar,
- LogicalVariable brokerVar, LogicalVariable brokerSubscriptionVar, LogicalVariable brokerSubscriptionIdVar,
- LogicalVariable brokerEndpointVar, LogicalVariable channelSubscriptionIdVar,
- LogicalVariable brokerSubscriptionChannelIdVar, IOptimizationContext context, AbstractLogicalOperator op1)
+ private AssignOperator createAssignsAndUnnest(LogicalVariable brokerSubsVar, LogicalVariable brokerEndpoint,
+ LogicalVariable brokerSubId, AbstractLogicalOperator op, IOptimizationContext context)
throws AlgebricksException {
FunctionInfo finfoGetField =
- (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_INDEX);
- FunctionInfo finfoGetEquality = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.EQ);
- FunctionInfo finfoGetAnd = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.AND);
-
- //Create Select Operator
- //The operator matches (A) the broker dataverse and name between broker subscriptions and brokers
- // and (B) the channel subscription Id between broker subscriptions and channel subscriptions
- ScalarFunctionCallExpression getBrokerName = new ScalarFunctionCallExpression(finfoGetField,
- new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
- new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(3)))));
- ScalarFunctionCallExpression getBrokerDataverse = new ScalarFunctionCallExpression(finfoGetField,
- new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
- new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
-
- VariableReferenceExpression BrokerBrokerNameReference = new VariableReferenceExpression(brokerNameVar);
- VariableReferenceExpression BrokerBrokerDataverseReference =
- new VariableReferenceExpression(brokerDataverseVar);
- VariableReferenceExpression brokerSubscriptionChannelIdVarReference =
- new VariableReferenceExpression(brokerSubscriptionChannelIdVar);
-
- ScalarFunctionCallExpression brokerNameCheck = new ScalarFunctionCallExpression(finfoGetEquality,
- new MutableObject<>(getBrokerName), new MutableObject<>(BrokerBrokerNameReference));
- ScalarFunctionCallExpression brokerDataverseCheck = new ScalarFunctionCallExpression(finfoGetEquality,
- new MutableObject<>(getBrokerDataverse), new MutableObject<>(BrokerBrokerDataverseReference));
- ScalarFunctionCallExpression channelSubCheck = new ScalarFunctionCallExpression(finfoGetEquality,
- new MutableObject<>(brokerSubscriptionChannelIdVarReference),
- new MutableObject<>(new VariableReferenceExpression(channelSubscriptionIdVar)));
-
- ScalarFunctionCallExpression brokerAndExpression = new ScalarFunctionCallExpression(finfoGetAnd,
- new MutableObject<>(brokerDataverseCheck), new MutableObject<>(brokerNameCheck));
-
- DataSourceScanOperator brokerScan = (DataSourceScanOperator) op1.getInputs().get(0).getValue();
-
- SelectOperator selectSubscriptions = new SelectOperator(new MutableObject<>(channelSubCheck), false, null);
- selectSubscriptions.getInputs().add(brokerScan.getInputs().get(0));
-
- SelectOperator selectBrokers = new SelectOperator(new MutableObject<>(brokerAndExpression), false, null);
- selectBrokers.getInputs().add(new MutableObject<>(brokerScan));
-
- brokerScan.getInputs().set(0, new MutableObject<>(selectSubscriptions));
-
- //Create Assign Operator
- ScalarFunctionCallExpression getEndPoint = new ScalarFunctionCallExpression(finfoGetField,
- new MutableObject<>(new VariableReferenceExpression(brokerVar)),
- new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
- AssignOperator assign = new AssignOperator(brokerEndpointVar, new MutableObject<>(getEndPoint));
- assign.getInputs().add(new MutableObject<>(selectBrokers));
-
- op1.getInputs().set(0, new MutableObject<>(assign));
- context.computeAndSetTypeEnvironmentForOperator(selectSubscriptions);
- context.computeAndSetTypeEnvironmentForOperator(brokerScan);
- context.computeAndSetTypeEnvironmentForOperator(selectBrokers);
- context.computeAndSetTypeEnvironmentForOperator(assign);
- context.computeAndSetTypeEnvironmentForOperator(op1);
+ (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME);
+ FunctionInfo finfoScanCollection =
+ (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.SCAN_COLLECTION);
- return assign;
+ UnnestingFunctionCallExpression scanBrokerSubsVar = new UnnestingFunctionCallExpression(finfoScanCollection,
+ new MutableObject<>(new VariableReferenceExpression(brokerSubsVar)));
+ LogicalVariable unnestedBrokerSubsVar = context.newVar();
+ UnnestOperator unnestBrokerSubsVar =
+ new UnnestOperator(unnestedBrokerSubsVar, new MutableObject<>(scanBrokerSubsVar));
+ unnestBrokerSubsVar.getInputs().add(new MutableObject<>(op));
- }
- private DataSourceScanOperator moveScans(AbstractLogicalOperator opAboveScan, AbstractLogicalOperator op1,
- IOptimizationContext context, String subscriptionsName) throws AlgebricksException {
+ ScalarFunctionCallExpression getBrokerEndPoint = new ScalarFunctionCallExpression(finfoGetField,
+ new MutableObject<>(new VariableReferenceExpression(unnestedBrokerSubsVar)), new MutableObject<>(
+ new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint)))));
- DataSourceScanOperator brokerScan = null;
- int i = 0;
- for (Mutable<ILogicalOperator> subOp : opAboveScan.getInputs()) {
- if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
- brokerScan = (DataSourceScanOperator) subOp.getValue();
- break;
- }
- i++;
- }
- if (brokerScan == null) {
- return null;
- }
+ AssignOperator assignBrokerEndPoint =
+ new AssignOperator(brokerEndpoint, new MutableObject<>(getBrokerEndPoint));
+ assignBrokerEndPoint.getInputs().add(new MutableObject<>(unnestBrokerSubsVar));
- AbstractLogicalOperator brokerSubcriptionsScan =
- (AbstractLogicalOperator) brokerScan.getInputs().get(0).getValue();
- if (!isSubscriptionsScan(brokerSubcriptionsScan, subscriptionsName, BADConstants.BrokerSubscriptionsType)) {
- return null;
- }
- opAboveScan.getInputs().set(i, brokerSubcriptionsScan.getInputs().get(0));
- context.computeAndSetTypeEnvironmentForOperator(opAboveScan);
+ ScalarFunctionCallExpression getBrokerSubId = new ScalarFunctionCallExpression(finfoGetField,
+ new MutableObject<>(new VariableReferenceExpression(unnestedBrokerSubsVar)),
+ new MutableObject<>(new ConstantExpression(
+ new AsterixConstantValue(new AString(BADConstants.BrokerSubscriptionId)))));
- brokerSubcriptionsScan.getInputs().set(0, op1.getInputs().get(0));
- op1.getInputs().set(0, new MutableObject<>(brokerScan));
- context.computeAndSetTypeEnvironmentForOperator(brokerSubcriptionsScan);
- context.computeAndSetTypeEnvironmentForOperator(brokerScan);
- context.computeAndSetTypeEnvironmentForOperator(op1);
- return brokerScan;
+
+ AssignOperator assignBrokerSubId = new AssignOperator(brokerSubId, new MutableObject<>(getBrokerSubId));
+ assignBrokerSubId.getInputs().add(new MutableObject<>(assignBrokerEndPoint));
+
+
+ context.computeAndSetTypeEnvironmentForOperator(unnestBrokerSubsVar);
+ context.computeAndSetTypeEnvironmentForOperator(assignBrokerEndPoint);
+ context.computeAndSetTypeEnvironmentForOperator(assignBrokerSubId);
+
+ return assignBrokerSubId;
}
+
+
private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
IAType resultType) {
@@ -396,40 +320,9 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
}
- private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
- AbstractLogicalOperator opAboveBrokersScan) {
- Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
- new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint))));
- DataSourceScanOperator brokerScan = null;
- int index = 0;
- for (Mutable<ILogicalOperator> subOp : opAboveBrokersScan.getInputs()) {
- if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
- brokerScan = (DataSourceScanOperator) subOp.getValue();
- break;
- }
- index++;
- }
- Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
- new VariableReferenceExpression(brokerScan.getVariables().get(2)));
-
- ScalarFunctionCallExpression fieldAccessByName = new ScalarFunctionCallExpression(
- FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
- ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(1);
- varArray.add(brokerEndpointVar);
- ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(1);
- exprArray.add(new MutableObject<ILogicalExpression>(fieldAccessByName));
-
- AssignOperator assignOp = new AssignOperator(varArray, exprArray);
-
- //Place assignOp between the scan and the op above it
- assignOp.getInputs().add(new MutableObject<ILogicalOperator>(brokerScan));
- opAboveBrokersScan.getInputs().set(index, new MutableObject<ILogicalOperator>(assignOp));
-
- return assignOp;
- }
/*This function is used to find specific operators within the plan, either
- * 1. The brokers dataset scan
+ * 1. The assign for the record being inserted
* 2. The highest project of the plan
* 3. The subscriptions scan
* 4. Commit operator
@@ -443,8 +336,8 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
}
for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
if (searchId == 1) {
- if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
- return op;
+ if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+ return (AbstractLogicalOperator) subOp.getValue();
} else {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
searchId, param1, param2);