You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2018/06/26 22:34:08 UTC

asterixdb-bad git commit: Pull mostly works, but stores the broker subs in the result (which we don't need)

Repository: asterixdb-bad
Updated Branches:
  refs/heads/resultsFinalVersion [created] d7e09a96c


Pull mostly works, but stores the broker subs in the result (which we don't need)


Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/d7e09a96
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/d7e09a96
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/d7e09a96

Branch: refs/heads/resultsFinalVersion
Commit: d7e09a96cbfd02f3f982d13f42bc6a3a8b4ed6ce
Parents: ffc937a
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Tue Jun 26 15:33:18 2018 -0700
Committer: Steven Glenn Jacobs <sj...@ucr.edu>
Committed: Tue Jun 26 15:33:18 2018 -0700

----------------------------------------------------------------------
 .../InsertBrokerNotifierForChannelRule.java     | 211 +++++--------------
 1 file changed, 52 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d7e09a96/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 1ef678d..6b91a8c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -28,7 +28,6 @@ import org.apache.asterix.bad.runtime.NotifyBrokerOperator;
 import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
-import org.apache.asterix.om.base.AInt32;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
@@ -48,8 +47,8 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionC
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -61,7 +60,8 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -134,9 +134,6 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
             channelName = datasetName.substring(0, datasetName.length() - 13);
         }
 
-        //get channelSubscriptionIdVar
-        LogicalVariable channelSubscriptionIdVar = subscriptionsScan.getVariables().get(0);
-
         //The channelExecutionTime is created just before the scan
         ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
         if (channelExecutionAssign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
@@ -147,52 +144,33 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
             return false;
         }
 
-        if (!push) {
-            ((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
-        }
-
         //move broker scan
-        AbstractLogicalOperator opAboveBrokersScan = findOp(op, 1, "", "");
-        if (opAboveBrokersScan == null) {
+        SubplanOperator subplanOperator = (SubplanOperator) findOp(op, 1, "", "");
+        if (subplanOperator == null) {
             return false;
         }
-        DataSourceScanOperator brokerScan =
-                moveScans(opAboveBrokersScan, op1, context, channelName + "BrokerSubscriptions");
 
-        if (brokerScan == null) {
-            return false;
-        }
-        DataSourceScanOperator brokerSubscriptionScan =
-                (DataSourceScanOperator) brokerScan.getInputs().get(0).getValue();
-
-        //Add select to join subscriptions and broker and assign to get endpoint
-        LogicalVariable brokerDataverseVar = brokerScan.getVariables().get(0);
-        LogicalVariable brokerNameVar = brokerScan.getVariables().get(1);
-        LogicalVariable brokerVar = brokerScan.getVariables().get(2);
-        LogicalVariable brokerSubscriptionVar = brokerSubscriptionScan.getVariables().get(2);
-        LogicalVariable brokerSubscriptionIdVar = brokerSubscriptionScan.getVariables().get(1);
-        LogicalVariable brokerSubscriptionChannelIdVar = brokerSubscriptionScan.getVariables().get(0);
-
-        LogicalVariable brokerEndpointVar = context.newVar();
-        AssignOperator assign = createAssignAndSelect(brokerDataverseVar, brokerNameVar, brokerVar,
-                brokerSubscriptionVar, brokerSubscriptionIdVar, brokerEndpointVar, channelSubscriptionIdVar,
-                brokerSubscriptionChannelIdVar, context, op1);
+        LogicalVariable brokerEndpoint = context.newVar();
+        LogicalVariable brokerSubId = context.newVar();
+        LogicalVariable brokerSubsVar =
+                ((AggregateOperator) subplanOperator.getNestedPlans().get(0).getRoots().get(0).getValue())
+                        .getVariables().get(0);
+
+        AssignOperator newAssign = createAssignsAndUnnest(brokerSubsVar, brokerEndpoint, brokerSubId, op, context);
 
         context.computeAndSetTypeEnvironmentForOperator(op1);
 
         //Maybe we need to add a project???
         ProjectOperator badProject = (ProjectOperator) findOp(op1, 2, "", "");
         badProject.getVariables().add(channelExecutionVar);
-        badProject.getVariables().add(channelSubscriptionIdVar);
+        badProject.getVariables().add(brokerSubsVar);
         context.computeAndSetTypeEnvironmentForOperator(badProject);
 
-
         //Create my brokerNotify plan above the extension Operator
         DelegateOperator dOp = push
-                ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
-                        context, assign, (DistributeResultOperator) op1, channelDataverse, channelName)
-                : createNotifyBrokerPullPlan(brokerEndpointVar, brokerSubscriptionIdVar, channelExecutionVar, context,
-                        assign,
+                ? createNotifyBrokerPushPlan(brokerEndpoint, badProject.getVariables().get(0), channelExecutionVar,
+                        context, newAssign, (DistributeResultOperator) op1, channelDataverse, channelName)
+                : createNotifyBrokerPullPlan(brokerEndpoint, brokerSubId, channelExecutionVar, context, newAssign,
                         (DistributeResultOperator) op1, channelDataverse, channelName);
 
         opRef.setValue(dOp);
@@ -200,106 +178,52 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
         return true;
     }
 
-    private AssignOperator createAssignAndSelect(LogicalVariable brokerDataverseVar, LogicalVariable brokerNameVar,
-            LogicalVariable brokerVar, LogicalVariable brokerSubscriptionVar, LogicalVariable brokerSubscriptionIdVar,
-            LogicalVariable brokerEndpointVar, LogicalVariable channelSubscriptionIdVar,
-            LogicalVariable brokerSubscriptionChannelIdVar, IOptimizationContext context, AbstractLogicalOperator op1)
+    private AssignOperator createAssignsAndUnnest(LogicalVariable brokerSubsVar, LogicalVariable brokerEndpoint,
+            LogicalVariable brokerSubId, AbstractLogicalOperator op, IOptimizationContext context)
             throws AlgebricksException {
 
         FunctionInfo finfoGetField =
-                (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_INDEX);
-        FunctionInfo finfoGetEquality = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.EQ);
-        FunctionInfo finfoGetAnd = (FunctionInfo) FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.AND);
-
-        //Create Select Operator
-        //The operator matches (A) the broker dataverse and name between broker subscriptions and brokers
-        // and (B) the channel subscription Id between broker subscriptions and channel subscriptions
-        ScalarFunctionCallExpression getBrokerName = new ScalarFunctionCallExpression(finfoGetField,
-                new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
-                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(3)))));
-        ScalarFunctionCallExpression getBrokerDataverse = new ScalarFunctionCallExpression(finfoGetField,
-                new MutableObject<>(new VariableReferenceExpression(brokerSubscriptionVar)),
-                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
-
-        VariableReferenceExpression BrokerBrokerNameReference = new VariableReferenceExpression(brokerNameVar);
-        VariableReferenceExpression BrokerBrokerDataverseReference =
-                new VariableReferenceExpression(brokerDataverseVar);
-        VariableReferenceExpression brokerSubscriptionChannelIdVarReference =
-                new VariableReferenceExpression(brokerSubscriptionChannelIdVar);
-
-        ScalarFunctionCallExpression brokerNameCheck = new ScalarFunctionCallExpression(finfoGetEquality,
-                new MutableObject<>(getBrokerName), new MutableObject<>(BrokerBrokerNameReference));
-        ScalarFunctionCallExpression brokerDataverseCheck = new ScalarFunctionCallExpression(finfoGetEquality,
-                new MutableObject<>(getBrokerDataverse), new MutableObject<>(BrokerBrokerDataverseReference));
-        ScalarFunctionCallExpression channelSubCheck = new ScalarFunctionCallExpression(finfoGetEquality,
-                new MutableObject<>(brokerSubscriptionChannelIdVarReference),
-                new MutableObject<>(new VariableReferenceExpression(channelSubscriptionIdVar)));
-
-        ScalarFunctionCallExpression brokerAndExpression = new ScalarFunctionCallExpression(finfoGetAnd,
-                        new MutableObject<>(brokerDataverseCheck), new MutableObject<>(brokerNameCheck));
-
-        DataSourceScanOperator brokerScan = (DataSourceScanOperator) op1.getInputs().get(0).getValue();
-
-        SelectOperator selectSubscriptions = new SelectOperator(new MutableObject<>(channelSubCheck), false, null);
-        selectSubscriptions.getInputs().add(brokerScan.getInputs().get(0));
-
-        SelectOperator selectBrokers = new SelectOperator(new MutableObject<>(brokerAndExpression), false, null);
-        selectBrokers.getInputs().add(new MutableObject<>(brokerScan));
-
-        brokerScan.getInputs().set(0, new MutableObject<>(selectSubscriptions));
-
-        //Create Assign Operator
-        ScalarFunctionCallExpression getEndPoint = new ScalarFunctionCallExpression(finfoGetField,
-                new MutableObject<>(new VariableReferenceExpression(brokerVar)),
-                new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
-        AssignOperator assign = new AssignOperator(brokerEndpointVar, new MutableObject<>(getEndPoint));
-        assign.getInputs().add(new MutableObject<>(selectBrokers));
-
-        op1.getInputs().set(0, new MutableObject<>(assign));
-        context.computeAndSetTypeEnvironmentForOperator(selectSubscriptions);
-        context.computeAndSetTypeEnvironmentForOperator(brokerScan);
-        context.computeAndSetTypeEnvironmentForOperator(selectBrokers);
-        context.computeAndSetTypeEnvironmentForOperator(assign);
-        context.computeAndSetTypeEnvironmentForOperator(op1);
+                (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME);
+        FunctionInfo finfoScanCollection =
+                (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.SCAN_COLLECTION);
 
-        return assign;
+        UnnestingFunctionCallExpression scanBrokerSubsVar = new UnnestingFunctionCallExpression(finfoScanCollection,
+                new MutableObject<>(new VariableReferenceExpression(brokerSubsVar)));
+        LogicalVariable unnestedBrokerSubsVar = context.newVar();
+        UnnestOperator unnestBrokerSubsVar =
+                new UnnestOperator(unnestedBrokerSubsVar, new MutableObject<>(scanBrokerSubsVar));
+        unnestBrokerSubsVar.getInputs().add(new MutableObject<>(op));
 
-    }
 
-    private DataSourceScanOperator moveScans(AbstractLogicalOperator opAboveScan, AbstractLogicalOperator op1,
-            IOptimizationContext context, String subscriptionsName) throws AlgebricksException {
+        ScalarFunctionCallExpression getBrokerEndPoint = new ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new VariableReferenceExpression(unnestedBrokerSubsVar)), new MutableObject<>(
+                        new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint)))));
 
-        DataSourceScanOperator brokerScan = null;
-        int i = 0;
-        for (Mutable<ILogicalOperator> subOp : opAboveScan.getInputs()) {
-            if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
-                brokerScan = (DataSourceScanOperator) subOp.getValue();
-                break;
-            }
-            i++;
-        }
-        if (brokerScan == null) {
-            return null;
-        }
+        AssignOperator assignBrokerEndPoint =
+                new AssignOperator(brokerEndpoint, new MutableObject<>(getBrokerEndPoint));
+        assignBrokerEndPoint.getInputs().add(new MutableObject<>(unnestBrokerSubsVar));
 
-        AbstractLogicalOperator brokerSubcriptionsScan =
-                (AbstractLogicalOperator) brokerScan.getInputs().get(0).getValue();
 
-        if (!isSubscriptionsScan(brokerSubcriptionsScan, subscriptionsName, BADConstants.BrokerSubscriptionsType)) {
-            return null;
-        }
-        opAboveScan.getInputs().set(i, brokerSubcriptionsScan.getInputs().get(0));
-        context.computeAndSetTypeEnvironmentForOperator(opAboveScan);
+        ScalarFunctionCallExpression getBrokerSubId = new ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new VariableReferenceExpression(unnestedBrokerSubsVar)),
+                new MutableObject<>(new ConstantExpression(
+                        new AsterixConstantValue(new AString(BADConstants.BrokerSubscriptionId)))));
 
-        brokerSubcriptionsScan.getInputs().set(0, op1.getInputs().get(0));
-        op1.getInputs().set(0, new MutableObject<>(brokerScan));
-        context.computeAndSetTypeEnvironmentForOperator(brokerSubcriptionsScan);
-        context.computeAndSetTypeEnvironmentForOperator(brokerScan);
-        context.computeAndSetTypeEnvironmentForOperator(op1);
-        return brokerScan;
+
+        AssignOperator assignBrokerSubId = new AssignOperator(brokerSubId, new MutableObject<>(getBrokerSubId));
+        assignBrokerSubId.getInputs().add(new MutableObject<>(assignBrokerEndPoint));
+
+
+        context.computeAndSetTypeEnvironmentForOperator(unnestBrokerSubsVar);
+        context.computeAndSetTypeEnvironmentForOperator(assignBrokerEndPoint);
+        context.computeAndSetTypeEnvironmentForOperator(assignBrokerSubId);
+
+        return assignBrokerSubId;
 
     }
 
+
+
     private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
             LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
             IAType resultType) {
@@ -396,40 +320,9 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
 
     }
 
-    private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
-            AbstractLogicalOperator opAboveBrokersScan) {
-        Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
-                new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint))));
-        DataSourceScanOperator brokerScan = null;
-        int index = 0;
-        for (Mutable<ILogicalOperator> subOp : opAboveBrokersScan.getInputs()) {
-            if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
-                brokerScan = (DataSourceScanOperator) subOp.getValue();
-                break;
-            }
-            index++;
-        }
-        Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
-                new VariableReferenceExpression(brokerScan.getVariables().get(2)));
-
-        ScalarFunctionCallExpression fieldAccessByName = new ScalarFunctionCallExpression(
-                FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
-        ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(1);
-        varArray.add(brokerEndpointVar);
-        ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(1);
-        exprArray.add(new MutableObject<ILogicalExpression>(fieldAccessByName));
-
-        AssignOperator assignOp = new AssignOperator(varArray, exprArray);
-
-        //Place assignOp between the scan and the op above it
-        assignOp.getInputs().add(new MutableObject<ILogicalOperator>(brokerScan));
-        opAboveBrokersScan.getInputs().set(index, new MutableObject<ILogicalOperator>(assignOp));
-
-        return assignOp;
-    }
 
     /*This function is used to find specific operators within the plan, either
-     * 1. The brokers dataset scan
+     * 1. The assign for the record being inserted
      * 2. The highest project of the plan
      * 3. The subscriptions scan
      * 4. Commit operator
@@ -443,8 +336,8 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
         }
         for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
             if (searchId == 1) {
-                if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
-                    return op;
+                if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+                    return (AbstractLogicalOperator) subOp.getValue();
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
                             searchId, param1, param2);