You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2018/06/25 19:38:38 UTC
[7/7] asterixdb-bad git commit: Got pull channel plan fully working
Got pull channel plan fully working
Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/9c4d808d
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/9c4d808d
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/9c4d808d
Branch: refs/heads/resultsNoReplicate
Commit: 9c4d808d964bafde84bb90e6d14497c45d8c81a1
Parents: 248ac3e
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Mon Jun 25 12:37:45 2018 -0700
Committer: Steven Glenn Jacobs <sj...@ucr.edu>
Committed: Mon Jun 25 12:37:45 2018 -0700
----------------------------------------------------------------------
.../InsertBrokerNotifierForChannelRule.java | 22 +++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/9c4d808d/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 4ec6d2f..1ef678d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -191,7 +191,7 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
DelegateOperator dOp = push
? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
context, assign, (DistributeResultOperator) op1, channelDataverse, channelName)
- : createNotifyBrokerPullPlan(brokerEndpointVar, channelSubscriptionIdVar, channelExecutionVar, context,
+ : createNotifyBrokerPullPlan(brokerEndpointVar, brokerSubscriptionIdVar, channelExecutionVar, context,
assign,
(DistributeResultOperator) op1, channelDataverse, channelName);
@@ -235,22 +235,30 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
new MutableObject<>(brokerSubscriptionChannelIdVarReference),
new MutableObject<>(new VariableReferenceExpression(channelSubscriptionIdVar)));
- ScalarFunctionCallExpression andExpression =
- new ScalarFunctionCallExpression(finfoGetAnd, new MutableObject<>(channelSubCheck),
+ ScalarFunctionCallExpression brokerAndExpression = new ScalarFunctionCallExpression(finfoGetAnd,
new MutableObject<>(brokerDataverseCheck), new MutableObject<>(brokerNameCheck));
- SelectOperator select = new SelectOperator(new MutableObject<>(andExpression), false, null);
- select.getInputs().addAll(op1.getInputs());
+ DataSourceScanOperator brokerScan = (DataSourceScanOperator) op1.getInputs().get(0).getValue();
+
+ SelectOperator selectSubscriptions = new SelectOperator(new MutableObject<>(channelSubCheck), false, null);
+ selectSubscriptions.getInputs().add(brokerScan.getInputs().get(0));
+
+ SelectOperator selectBrokers = new SelectOperator(new MutableObject<>(brokerAndExpression), false, null);
+ selectBrokers.getInputs().add(new MutableObject<>(brokerScan));
+
+ brokerScan.getInputs().set(0, new MutableObject<>(selectSubscriptions));
//Create Assign Operator
ScalarFunctionCallExpression getEndPoint = new ScalarFunctionCallExpression(finfoGetField,
new MutableObject<>(new VariableReferenceExpression(brokerVar)),
new MutableObject<>(new ConstantExpression(new AsterixConstantValue(new AInt32(2)))));
AssignOperator assign = new AssignOperator(brokerEndpointVar, new MutableObject<>(getEndPoint));
- assign.getInputs().add(new MutableObject<>(select));
+ assign.getInputs().add(new MutableObject<>(selectBrokers));
op1.getInputs().set(0, new MutableObject<>(assign));
- context.computeAndSetTypeEnvironmentForOperator(select);
+ context.computeAndSetTypeEnvironmentForOperator(selectSubscriptions);
+ context.computeAndSetTypeEnvironmentForOperator(brokerScan);
+ context.computeAndSetTypeEnvironmentForOperator(selectBrokers);
context.computeAndSetTypeEnvironmentForOperator(assign);
context.computeAndSetTypeEnvironmentForOperator(op1);