You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2018/06/25 19:38:38 UTC

[7/7] asterixdb-bad git commit: Got pull channel plan fully working

Got pull channel plan fully working


Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/9c4d808d
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/9c4d808d
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/9c4d808d

Branch: refs/heads/resultsNoReplicate
Commit: 9c4d808d964bafde84bb90e6d14497c45d8c81a1
Parents: 248ac3e
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Mon Jun 25 12:37:45 2018 -0700
Committer: Steven Glenn Jacobs <sj...@ucr.edu>
Committed: Mon Jun 25 12:37:45 2018 -0700

----------------------------------------------------------------------
 .../InsertBrokerNotifierForChannelRule.java     | 22 +++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/9c4d808d/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 4ec6d2f..1ef678d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -191,7 +191,7 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
         DelegateOperator dOp = push
                 ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
                         context, assign, (DistributeResultOperator) op1, channelDataverse, channelName)
-                : createNotifyBrokerPullPlan(brokerEndpointVar, channelSubscriptionIdVar, channelExecutionVar, context,
+                : createNotifyBrokerPullPlan(brokerEndpointVar, brokerSubscriptionIdVar, channelExecutionVar, context,
                         assign,
                         (DistributeResultOperator) op1, channelDataverse, channelName);
 
@@ -235,22 +235,30 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
                 new MutableObject<>(brokerSubscriptionChannelIdVarReference),
                 new MutableObject<>(new VariableReferenceExpression(channelSubscriptionIdVar)));
 
-        ScalarFunctionCallExpression andExpression =
-                new ScalarFunctionCallExpression(finfoGetAnd, new MutableObject<>(channelSubCheck),
+        ScalarFunctionCallExpression brokerAndExpression = new ScalarFunctionCallExpression(finfoGetAnd,
                         new MutableObject<>(brokerDataverseCheck), new MutableObject<>(brokerNameCheck));
 
-        SelectOperator select = new SelectOperator(new MutableObject<>(andExpression), false, null);
-        select.getInputs().addAll(op1.getInputs());
+        DataSourceScanOperator brokerScan = (DataSourceScanOperator) op1.getInputs().get(0).getValue();
+
+        SelectOperator selectSubscriptions = new SelectOperator(new MutableObject<>(channelSubCheck), false, null);
+        selectSubscriptions.getInputs().add(brokerScan.getInputs().get(0));
+
+        SelectOperator selectBrokers = new SelectOperator(new MutableObject<>(brokerAndExpression), false, null);
+        selectBrokers.getInputs().add(new MutableObject<>(brokerScan));
+
+        brokerScan.getInputs().set(0, new MutableObject<>(selectSubscriptions));
 
         //Create Assign Operator
         ScalarFunctionCallExpression getEndPoint = new ScalarFunctionCallExpression(finfoGetField,
                 new MutableObject<>(new VariableReferenceExpression(brokerVar)),
                 new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
         AssignOperator assign = new AssignOperator(brokerEndpointVar, new MutableObject<>(getEndPoint));
-        assign.getInputs().add(new MutableObject<>(select));
+        assign.getInputs().add(new MutableObject<>(selectBrokers));
 
         op1.getInputs().set(0, new MutableObject<>(assign));
-        context.computeAndSetTypeEnvironmentForOperator(select);
+        context.computeAndSetTypeEnvironmentForOperator(selectSubscriptions);
+        context.computeAndSetTypeEnvironmentForOperator(brokerScan);
+        context.computeAndSetTypeEnvironmentForOperator(selectBrokers);
         context.computeAndSetTypeEnvironmentForOperator(assign);
         context.computeAndSetTypeEnvironmentForOperator(op1);