You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org> on 2018/06/29 19:37:35 UTC

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Steven Jacobs has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2731

Change subject: Move channels to a result sharing framework
......................................................................

Move channels to a result sharing framework

Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
---
M asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
M asterix-bad/src/main/resources/lang-extension/lang.txt
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
M asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp
A asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.1.ddl.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.2.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.4.query.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.5.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.sqlpp
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.1.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.3.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.4.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.6.adm
M asterix-bad/src/test/resources/runtimets/testsuite.xml
39 files changed, 1,423 insertions(+), 777 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/31/2731/1

diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index 0467f6e..db4fc2d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -19,7 +19,8 @@
 package org.apache.asterix.bad;
 
 public interface BADConstants {
-    String SubscriptionId = "subscriptionId";
+    String ChannelSubscriptionId = "channelSubId";
+    String BrokerSubscriptionId = "brokerSubId";
     String BrokerName = "BrokerName";
     String ChannelName = "ChannelName";
     String ProcedureName = "ProcedureName";
@@ -29,9 +30,11 @@
     String ResultId = "resultId";
     String ChannelExecutionTime = "channelExecutionTime";
     String ChannelSubscriptionsType = "ChannelSubscriptionsType";
+    String BrokerSubscriptionsType = "BrokerSubscriptionsType";
     String ChannelResultsType = "ChannelResultsType";
     String ResultsDatasetName = "ResultsDatasetName";
-    String SubscriptionsDatasetName = "SubscriptionsDatasetName";
+    String ChannelSubscriptionsDatasetName = "ChannelSubscriptionsDatasetName";
+    String BrokerSubscriptionsDatasetName = "BrokerSubscriptionsDatasetName";
     String CHANNEL_EXTENSION_NAME = "Channel";
     String PROCEDURE_KEYWORD = "Procedure";
     String BROKER_KEYWORD = "Broker";
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 8403fcc..5d9d971 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -142,7 +142,11 @@
             ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
             tempMdProvider.getLocks().reset();
             dropStmt = new DropDatasetStatement(new Identifier(dataverse),
-                    new Identifier(channel.getSubscriptionsDataset()), true);
+                    new Identifier(channel.getChannelSubscriptionsDataset()), true);
+            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
+            tempMdProvider.getLocks().reset();
+            dropStmt = new DropDatasetStatement(new Identifier(dataverse),
+                    new Identifier(channel.getBrokerSubscriptionsDataset()), true);
             ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 7583f0b..c22a819 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -18,42 +18,30 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
-import java.util.ArrayList;
+import java.io.StringReader;
 import java.util.List;
 
 import org.apache.asterix.algebra.extension.ExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.Broker;
 import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Expression;
-import org.apache.asterix.lang.common.expression.CallExpr;
-import org.apache.asterix.lang.common.expression.FieldAccessor;
-import org.apache.asterix.lang.common.expression.FieldBinding;
-import org.apache.asterix.lang.common.expression.LiteralExpr;
-import org.apache.asterix.lang.common.expression.RecordConstructor;
-import org.apache.asterix.lang.common.expression.VariableExpr;
-import org.apache.asterix.lang.common.literal.StringLiteral;
-import org.apache.asterix.lang.common.statement.InsertStatement;
-import org.apache.asterix.lang.common.statement.Query;
-import org.apache.asterix.lang.common.statement.UpsertStatement;
+import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
@@ -66,11 +54,13 @@
     private final Identifier brokerDataverseName;
     private final Identifier brokerName;
     private final List<Expression> argList;
+    private final List<String> paramList;
     private final String subscriptionId;
     private final int varCounter;
 
     public ChannelSubscribeStatement(Identifier dataverseName, Identifier channelName, List<Expression> argList,
-            int varCounter, Identifier brokerDataverseName, Identifier brokerName, String subscriptionId) {
+            int varCounter, Identifier brokerDataverseName, Identifier brokerName, String subscriptionId,
+            List<String> paramList) {
         this.channelName = channelName;
         this.dataverseName = dataverseName;
         this.brokerDataverseName = brokerDataverseName;
@@ -78,6 +68,7 @@
         this.argList = argList;
         this.subscriptionId = subscriptionId;
         this.varCounter = varCounter;
+        this.paramList = paramList;
     }
 
     public Identifier getDataverseName() {
@@ -138,82 +129,39 @@
                 throw new AsterixException("There is no broker with this name " + brokerName + ".");
             }
 
-            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+            String channelSubscriptionsDataset =
+                    channel.getChannelId().getDataverse() + "." + channel.getChannelSubscriptionsDataset();
 
-            if (argList.size() != channel.getFunction().getArity()) {
-                throw new AsterixException("Channel expected " + channel.getFunction().getArity()
-                        + " parameters but got " + argList.size());
-            }
-
-            Query subscriptionTuple = new Query(false);
-
-            List<FieldBinding> fb = new ArrayList<>();
-            LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.DataverseName));
-            Expression rightExpr = new LiteralExpr(new StringLiteral(brokerDataverse));
-            fb.add(new FieldBinding(leftExpr, rightExpr));
-
-            leftExpr = new LiteralExpr(new StringLiteral(BADConstants.BrokerName));
-            rightExpr = new LiteralExpr(new StringLiteral(broker.getBrokerName()));
-            fb.add(new FieldBinding(leftExpr, rightExpr));
-
-            if (subscriptionId != null) {
-                leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
-
-                List<Expression> UUIDList = new ArrayList<>();
-                UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
-                FunctionIdentifier function = BuiltinFunctions.UUID_CONSTRUCTOR;
-                FunctionSignature UUIDfunc =
-                        new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
-                CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
-
-                rightExpr = UUIDCall;
-                fb.add(new FieldBinding(leftExpr, rightExpr));
-            }
-
-            for (int i = 0; i < argList.size(); i++) {
-                leftExpr = new LiteralExpr(new StringLiteral("param" + i));
-                rightExpr = argList.get(i);
-                fb.add(new FieldBinding(leftExpr, rightExpr));
-            }
-            RecordConstructor recordCon = new RecordConstructor(fb);
-            subscriptionTuple.setBody(recordCon);
-            subscriptionTuple.setVarCounter(varCounter);
-            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
-                    metadataProvider.getDefaultDataverse());
-            tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
+            String brokerSubscriptionsDataset =
+                    channel.getChannelId().getDataverse() + "." + channel.getBrokerSubscriptionsDataset();
 
             final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
             final IHyracksDataset hdc = requestParameters.getHyracksDataset();
             final Stats stats = requestParameters.getStats();
             if (subscriptionId == null) {
-                //To create a new subscription
-                VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
-                VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0));
-                useResultVar.setIsNewVar(false);
-                FieldAccessor accessor = new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId));
+                //Create a new subscription
+                if (argList.size() != channel.getFunction().getArity()) {
+                    throw new AsterixException("Channel expected " + channel.getFunction().getArity()
+                            + " parameters but got " + argList.size());
+                }
 
+                createChannelSubscription(statementExecutor, metadataProvider, hcc, hdc, stats,
+                        channelSubscriptionsDataset, resultDelivery);
+
+                metadataProvider.getLocks().reset();
                 metadataProvider.setResultSetId(new ResultSetId(resultSetId));
                 boolean resultsAsync =
                         resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
                 metadataProvider.setResultAsyncMode(resultsAsync);
-                tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
-                tempMdProvider.setResultAsyncMode(resultsAsync);
-                tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
-                tempMdProvider
-                        .setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
-                tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
-                tempMdProvider.setMaxResultReads(requestParameters.getResultProperties().getMaxReads());
+                metadataProvider.setMaxResultReads(requestParameters.getResultProperties().getMaxReads());
 
-                InsertStatement insert = new InsertStatement(new Identifier(dataverse),
-                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, accessor);
-                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc,
-                        resultDelivery, null, stats, false, null, null, null);
+                createBrokerSubscription(statementExecutor, metadataProvider, hcc, hdc, stats,
+                        channelSubscriptionsDataset, brokerSubscriptionsDataset, broker, resultDelivery);
+
             } else {
-                //To update an existing subscription
-                UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse),
-                        new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null);
-                ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc,
-                        resultDelivery, null, stats, false, null, null, null);
+                //move subscription
+                moveBrokerSubscription(statementExecutor, metadataProvider, hcc, hdc, stats, brokerSubscriptionsDataset,
+                        broker, resultDelivery, subscriptionId);
             }
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -226,4 +174,89 @@
 
     }
 
+    private void createChannelSubscription(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats, String channelSubscriptionsDataset,
+            ResultDelivery resultDelivery) throws Exception {
+        //TODO: Might be better to create the entire expression manually rather than parsing a string
+        StringBuilder builder = new StringBuilder();
+        builder.append("upsert into " + channelSubscriptionsDataset + "(\n");
+        builder.append("(let v = (select value s from " + channelSubscriptionsDataset + " s where ");
+        for (int i = 0; i < paramList.size(); i++) {
+            builder.append("param" + i + " =  " + paramList.get(i));
+            if (i < paramList.size() - 1) {
+                builder.append(" and ");
+            }
+        }
+        builder.append(")\n");
+        builder.append("select value (CASE (array_count(v) > 0)\n");
+        builder.append("WHEN true THEN {\"" + BADConstants.ChannelSubscriptionId + "\":v[0]."
+                + BADConstants.ChannelSubscriptionId + ", ");
+        for (int i = 0; i < paramList.size(); i++) {
+            builder.append("\"param" + i + "\": " + paramList.get(i));
+            if (i < paramList.size() - 1) {
+                builder.append(", ");
+            }
+        }
+        builder.append("}\n");
+        builder.append("ELSE {\"" + BADConstants.ChannelSubscriptionId + "\":create_uuid(), ");
+        for (int i = 0; i < paramList.size(); i++) {
+            builder.append("\"param" + i + "\": " + paramList.get(i));
+            if (i < paramList.size() - 1) {
+                builder.append(", ");
+            }
+        }
+        builder.append("}\n");
+        builder.append("END))\n");
+        builder.append(");");
+        BADParserFactory factory = new BADParserFactory();
+        List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
+        ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0), hcc,
+                hdc, resultDelivery, null, stats, false, null, null, null);
+    }
+
+    private void createBrokerSubscription(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats, String channelSubscriptionsDataset,
+            String brokerSubscriptionDataset, Broker broker, ResultDelivery resultDelivery) throws Exception {
+        //TODO: Might be better to create the entire expression manually rather than parsing a string
+        StringBuilder builder = new StringBuilder();
+        builder.append("insert into " + brokerSubscriptionDataset + " as r (\n");
+        builder.append("(select value {\"" + BADConstants.ChannelSubscriptionId + "\":s."
+                + BADConstants.ChannelSubscriptionId + ",\"" + BADConstants.BrokerSubscriptionId
+                + "\":create_uuid(), \"" + BADConstants.DataverseName + "\":\"" + broker.getDataverseName() + "\", \""
+                + BADConstants.BrokerName + "\":\"" + broker.getBrokerName() + "\"}\n");
+        builder.append("from " + channelSubscriptionsDataset + " s where ");
+        for (int i = 0; i < paramList.size(); i++) {
+            builder.append("param" + i + " =  " + paramList.get(i));
+            if (i < paramList.size() - 1) {
+                builder.append(" and ");
+            }
+        }
+        builder.append(" limit 1)\n");
+        builder.append(") returning r." + BADConstants.BrokerSubscriptionId + ";");
+        BADParserFactory factory = new BADParserFactory();
+        List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
+        ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0), hcc,
+                hdc, resultDelivery, null, stats, false, null, null, null);
+    }
+
+    private void moveBrokerSubscription(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats, String brokerSubscriptionDataset,
+            Broker broker, ResultDelivery resultDelivery, String subscriptionString) throws Exception {
+        //TODO: Might be better to create the entire expression manually rather than parsing a string
+        StringBuilder builder = new StringBuilder();
+        builder.append("upsert into " + brokerSubscriptionDataset + "(\n");
+        builder.append(
+                "(select value {\"" + BADConstants.ChannelSubscriptionId + "\":s." + BADConstants.ChannelSubscriptionId
+                        + ",\"" + BADConstants.BrokerSubscriptionId + "\":s." + BADConstants.BrokerSubscriptionId
+                        + ",\"" + BADConstants.DataverseName + "\":\"" + broker.getDataverseName() + "\", \""
+                        + BADConstants.BrokerName + "\":\"" + broker.getBrokerName() + "\"}\n");
+        builder.append("from " + brokerSubscriptionDataset + " s where ");
+        builder.append("s." + BADConstants.BrokerSubscriptionId + " = uuid(\"" + subscriptionString + "\"))\n");
+        builder.append(");");
+        BADParserFactory factory = new BADParserFactory();
+        List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
+        ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0), hcc,
+                hdc, resultDelivery, null, stats, false, null, null, null);
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index b23bf3b..86041b7 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -113,11 +113,11 @@
                 throw new AsterixException("There is no channel with this name " + channelName + ".");
             }
 
-            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+            String subscriptionsDatasetName = channel.getBrokerSubscriptionsDataset();
 
             //Need a condition to say subscription-id = sid
             OperatorExpr condition = new OperatorExpr();
-            FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.SubscriptionId));
+            FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.BrokerSubscriptionId));
             condition.addOperand(fa);
             condition.setCurrentop(true);
             condition.addOperator("=");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index a28666a..5813599 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -85,7 +85,8 @@
     private Identifier dataverseName;
     private String duration;
     private String body;
-    private String subscriptionsTableName;
+    private String channelSubscriptionsTableName;
+    private String brokerSubscriptionsTableName;
     private String resultsTableName;
     private String dataverse;
     private final boolean push;
@@ -113,7 +114,7 @@
     }
 
     public String getSubscriptionsName() {
-        return subscriptionsTableName;
+        return channelSubscriptionsTableName;
     }
 
     public String getDuration() {
@@ -159,29 +160,53 @@
     private void createDatasets(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws AsterixException, Exception {
 
+        //Create channel subscriptions dataset
         Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
         Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
         //Setup the subscriptions dataset
         List<List<String>> partitionFields = new ArrayList<>();
         List<Integer> keyIndicators = new ArrayList<>();
         keyIndicators.add(0);
-        List<String> fieldNames = new ArrayList<>();
-        fieldNames.add(BADConstants.SubscriptionId);
-        partitionFields.add(fieldNames);
-        IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
-        DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName),
-                new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
-                new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
+        List<String> fieldName = new ArrayList<>();
+        fieldName.add(BADConstants.ChannelSubscriptionId);
+        partitionFields.add(fieldName);
+        IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, false, null);
+        DatasetDecl createChannelSubscriptionsDataset =
+                new DatasetDecl(dataverseName, new Identifier(channelSubscriptionsTableName),
+                        new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
+                        new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
 
-        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
+        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
+                createChannelSubscriptionsDataset, hcc, null);
+
+        //Create broker subscriptions dataset
+        Identifier brokerSubscriptionsTypeName = new Identifier(BADConstants.BrokerSubscriptionsType);
+        partitionFields = new ArrayList<>();
+        keyIndicators = new ArrayList<>();
+        keyIndicators.add(0);
+        keyIndicators.add(0);
+        fieldName = new ArrayList<>();
+        List<String> fieldName2 = new ArrayList<>();
+        fieldName.add(BADConstants.ChannelSubscriptionId);
+        fieldName2.add(BADConstants.BrokerSubscriptionId);
+        partitionFields.add(fieldName);
+        partitionFields.add(fieldName2);
+        idd = new InternalDetailsDecl(partitionFields, keyIndicators, false, null);
+        DatasetDecl createBrokerSubscriptionsDataset =
+                new DatasetDecl(dataverseName, new Identifier(brokerSubscriptionsTableName),
+                        new Identifier(BADConstants.BAD_DATAVERSE_NAME), brokerSubscriptionsTypeName, null, null, null,
+                        new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
+        metadataProvider.getLocks().reset();
+        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
+                createBrokerSubscriptionsDataset,
                 hcc, null);
 
         if (!push) {
             //Setup the results dataset
             partitionFields = new ArrayList<>();
-            fieldNames = new ArrayList<>();
-            fieldNames.add(BADConstants.ResultId);
-            partitionFields.add(fieldNames);
+            fieldName = new ArrayList<>();
+            fieldName.add(BADConstants.ResultId);
+            partitionFields.add(fieldName);
             idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
             DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
                     new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, new HashMap<>(),
@@ -216,31 +241,97 @@
 
     private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) throws Exception {
+        /*
+            pull version:
+            set inline_with "false";
+            insert into channels.ChannelNameResults as a (
+            with channelExecutionTime as current_datetime()
+            select result, channelExecutionTime, sub.channelSubId as channelSubId,current_datetime() as deliveryTime,
+            (select b.BrokerEndPoint, bs.brokerSubId from
+            channels.ChannelNameBrokerSubscriptions bs,
+            Metadata.`Broker` b
+            where bs.BrokerName = b.BrokerName
+            and bs.DataverseName = b.DataverseName
+            and bs.channelSubId = sub.channelSubId
+            ) as brokerSubIds
+            from channels.ChannelNameChannelSubscriptions sub,
+            channels.ChannelFunctionName(sub.param0, sub.param1, ...) result
+            ) returning a;
+        
+            push version:
+            set inline_with "false";
+            select value {"payload": {"result":result, "subscriptionIds": brokerSubIds}} from (
+            with channelExecutionTime as current_datetime()
+            select b.BrokerEndPoint, result, bs.brokerSubId as brokerSubId, channelExecutionTime
+            from channels.EmergenciesNearMeChannelChannelSubscriptions sub,
+            channels.EmergenciesNearMe(sub.param0,sub.param1) result
+            channels.EmergenciesNearMeChannelBrokerSubscriptions bs,
+            Metadata.Broker b
+            where bs.BrokerName = b.BrokerName
+            and bs.DataverseName = b.DataverseName
+            and bs.channelSubId = sub.channelSubId
+            ) results
+            group by BrokerEndPoint,result, channelExecutionTime group as brokerSubIds (brokerSubId as subscriptionId);
+         */
         StringBuilder builder = new StringBuilder();
         builder.append("SET inline_with \"false\";\n");
         if (!push) {
             builder.append("insert into " + dataverse + "." + resultsTableName);
             builder.append(" as a (\n");
+
+            builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
+            builder.append("select result, ");
+            builder.append(BADConstants.ChannelExecutionTime + ", ");
+            builder.append(
+                    "sub." + BADConstants.ChannelSubscriptionId + " as " + BADConstants.ChannelSubscriptionId + ",");
+            builder.append("current_datetime() as " + BADConstants.DeliveryTime + ",\n");
+
+            builder.append("(select b." + BADConstants.BrokerEndPoint + ", bs." + BADConstants.BrokerSubscriptionId
+                    + " from\n");
+        } else {
+            builder.append(
+                    "select value {\"payload\": {\"result\":result, \"subscriptionIds\": brokerSubIds}} from (\n");
+            builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
+            builder.append(
+                    "select b." + BADConstants.BrokerEndPoint + ", result, bs." + BADConstants.BrokerSubscriptionId);
+            builder.append(
+                    " as " + BADConstants.BrokerSubscriptionId + ", " + BADConstants.ChannelExecutionTime + "\n");
+            builder.append("from " + dataverse + "." + channelSubscriptionsTableName + " sub,\n");
+            builder.append(function.getNamespace() + "." + function.getName() + "(");
+            int i = 0;
+            for (; i < function.getArity() - 1; i++) {
+                builder.append("sub.param" + i + ",");
+            }
+            builder.append("sub.param" + i + ") result,\n");
+
         }
-        builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
-        builder.append("select result, ");
-        builder.append(BADConstants.ChannelExecutionTime + ", ");
-        builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
-        builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
-        builder.append("from " + dataverse + "." + subscriptionsTableName + " sub,\n");
-        builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
-        builder.append(function.getNamespace() + "." + function.getName() + "(");
-        int i = 0;
-        for (; i < function.getArity() - 1; i++) {
-            builder.append("sub.param" + i + ",");
-        }
-        builder.append("sub.param" + i + ") result \n");
-        builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n");
-        builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n");
+        builder.append(dataverse + "." + brokerSubscriptionsTableName + " bs,\n");
+        builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b\n");
+
+        builder.append("where bs." + BADConstants.BrokerName + " = b." + BADConstants.BrokerName + "\n");
+        builder.append("and bs." + BADConstants.DataverseName + " = b." + BADConstants.DataverseName + "\n");
+        builder.append(
+                "and bs." + BADConstants.ChannelSubscriptionId + " = sub." + BADConstants.ChannelSubscriptionId + "\n");
+
         if (!push) {
+            builder.append(") as brokerSubIds\n");
+
+            builder.append("from " + dataverse + "." + channelSubscriptionsTableName + " sub,\n");
+            builder.append(function.getNamespace() + "." + function.getName() + "(");
+            int i = 0;
+            for (; i < function.getArity() - 1; i++) {
+                builder.append("sub.param" + i + ",");
+            }
+            builder.append("sub.param" + i + ") result \n");
             builder.append(")");
             builder.append(" returning a");
+
+        } else {
+            builder.append(") results\n");
+            builder.append("group by " + BADConstants.BrokerEndPoint + ",result, " + BADConstants.ChannelExecutionTime);
+            builder.append(" group as brokerSubIds (" + BADConstants.BrokerSubscriptionId + " as subscriptionId)");
         }
+
         builder.append(";");
         body = builder.toString();
         BADParserFactory factory = new BADParserFactory();
@@ -269,7 +360,9 @@
 
         dataverseName = new Identifier(((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName));
         dataverse = dataverseName.getValue();
-        subscriptionsTableName = channelName + BADConstants.subscriptionEnding;
+        channelSubscriptionsTableName =
+                channelName + BADConstants.CHANNEL_EXTENSION_NAME + BADConstants.subscriptionEnding;
+        brokerSubscriptionsTableName = channelName + BADConstants.BROKER_KEYWORD + BADConstants.subscriptionEnding;
         resultsTableName = push ? "" : channelName + BADConstants.resultsEnding;
 
         EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
@@ -297,7 +390,10 @@
             initialize(mdTxnCtx);
 
             //check if names are available before creating anything
-            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsTableName) != null) {
+            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, channelSubscriptionsTableName) != null) {
+                throw new AsterixException("The channel name:" + channelName + " is not available.");
+            }
+            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, brokerSubscriptionsTableName) != null) {
                 throw new AsterixException("The channel name:" + channelName + " is not available.");
             }
             if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsTableName) != null) {
@@ -323,7 +419,8 @@
 
             BADJobService.setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory(),
                     duration);
-            channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
+            channel = new Channel(dataverse, channelName.getValue(), channelSubscriptionsTableName,
+                    brokerSubscriptionsTableName, resultsTableName, function,
                     duration, null, body);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index cd2ff86..5be2e84 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -47,8 +47,12 @@
     public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
             NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
 
-    public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+    public static final Datatype BAD_CHANNEL_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
             BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
+
+    public static final Datatype BAD_BROKER_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.BrokerSubscriptionsType, BADMetadataRecordTypes.brokerSubscriptionsType, false);
+
     public static final Datatype BAD_RESULT_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
             BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
 
@@ -76,7 +80,6 @@
         return new MetadataTupleTranslatorProvider();
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
     public List<ExtensionMetadataDataset> getExtensionIndexes() {
         try {
@@ -107,7 +110,8 @@
                 // MetadataManager.INSTANCE.addDataverse(mdTxnCtx, BAD_DATAVERSE);
                 // insert default data type
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_RESULT_DATATYPE);
-                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_SUBSCRIPTION_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_SUBSCRIPTION_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_SUBSCRIPTION_DATATYPE);
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_DATATYPE);
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_DATATYPE);
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_PROCEDURE_DATATYPE);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index a764a5a..f6e2e1f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -27,16 +27,24 @@
 
 public class BADMetadataRecordTypes {
 
-    // -------------------------------------- Subscriptions --------------------------------------//
-    private static final String[] subTypeFieldNames =
-            { BADConstants.DataverseName, BADConstants.BrokerName, BADConstants.SubscriptionId };
-    private static final IAType[] subTypeFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AUUID };
+    // -------------------------------------- Channel Subscriptions --------------------------------------//
+    private static final String[] channelSubTypeFieldNames = { BADConstants.ChannelSubscriptionId };
+    private static final IAType[] channelSubTypeFieldTypes = { BuiltinType.AUUID };
     public static final ARecordType channelSubscriptionsType =
-            new ARecordType(BADConstants.ChannelSubscriptionsType, subTypeFieldNames, subTypeFieldTypes, true);
+            new ARecordType(BADConstants.ChannelSubscriptionsType, channelSubTypeFieldNames, channelSubTypeFieldTypes,
+                    true);
+
+    // -------------------------------------- Broker Subscriptions --------------------------------------//
+    private static final String[] brokerSubTypeFieldNames = { BADConstants.ChannelSubscriptionId,
+            BADConstants.BrokerSubscriptionId, BADConstants.DataverseName, BADConstants.BrokerName };
+    private static final IAType[] brokerSubTypeFieldTypes =
+            { BuiltinType.AUUID, BuiltinType.AUUID, BuiltinType.ASTRING, BuiltinType.ASTRING };
+    public static final ARecordType brokerSubscriptionsType = new ARecordType(BADConstants.BrokerSubscriptionsType,
+            brokerSubTypeFieldNames, brokerSubTypeFieldTypes, true);
 
     // ---------------------------------------- Results --------------------------------------------//
     private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime,
-            BADConstants.SubscriptionId, BADConstants.DeliveryTime };
+            BADConstants.ChannelSubscriptionId, BADConstants.DeliveryTime };
     private static final IAType[] resultTypeFieldTypes =
             { BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID, BuiltinType.ADATETIME };
     public static final ARecordType channelResultsType =
@@ -45,25 +53,28 @@
     //------------------------------------------ Channel ----------------------------------------//     
     public static final int CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
     public static final int CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX = 1;
-    public static final int CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
+    public static final int CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
     public static final int CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX = 3;
     public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
     public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
     public static final int CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX = 6;
     public static final int CHANNEL_ARECORD_BODY_FIELD_INDEX = 7;
+    public static final int CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX = 8;
     public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
             // RecordTypeName
             BADConstants.RECORD_TYPENAME_CHANNEL,
             // FieldNames
-            new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
+            new String[] { BADConstants.DataverseName, BADConstants.ChannelName,
+                    BADConstants.ChannelSubscriptionsDatasetName,
                     BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration,
-                    BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY },
+                    BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY,
+                    BADConstants.BrokerSubscriptionsDatasetName },
             // FieldTypes
             new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
                     new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING,
                     new AOrderedListType(new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null),
                             null),
-                    BuiltinType.ASTRING },
+                    BuiltinType.ASTRING, BuiltinType.ASTRING },
             //IsOpen?
             true);
     //------------------------------------------ Broker ----------------------------------------//
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
index ed9346c..cc0d81e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -34,7 +34,8 @@
 
     /** A unique identifier for the channel */
     protected final EntityId channelId;
-    private final String subscriptionsDatasetName;
+    private final String channelSubscriptionsDatasetName;
+    private final String brokerSubscriptionsDatasetName;
     private final String resultsDatasetName;
     private final String duration;
     private final String channelBody;
@@ -49,13 +50,15 @@
     */
     private final List<List<List<String>>> dependencies;
 
-    public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
+    public Channel(String dataverseName, String channelName, String channelSubscriptionsDatasetName,
+            String brokerSubscriptionsDatasetName, String resultsDataset,
             FunctionSignature function, String duration, List<List<List<String>>> dependencies, String channelBody) {
         this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
         this.function = function;
         this.duration = duration;
         this.resultsDatasetName = resultsDataset;
-        this.subscriptionsDatasetName = subscriptionsDataset;
+        this.channelSubscriptionsDatasetName = channelSubscriptionsDatasetName;
+        this.brokerSubscriptionsDatasetName = brokerSubscriptionsDatasetName;
         this.channelBody = channelBody;
         if (this.function.getNamespace() == null) {
             this.function.setNamespace(dataverseName);
@@ -67,9 +70,11 @@
             this.dependencies.add(new ArrayList<>());
             this.dependencies.add(new ArrayList<>());
             List<String> resultsList = Arrays.asList(dataverseName, resultsDatasetName);
-            List<String> subscriptionList = Arrays.asList(dataverseName, subscriptionsDatasetName);
+            List<String> channelSubscriptionList = Arrays.asList(dataverseName, channelSubscriptionsDatasetName);
+            List<String> brokerSubscriptionList = Arrays.asList(dataverseName, brokerSubscriptionsDatasetName);
             this.dependencies.get(0).add(resultsList);
-            this.dependencies.get(0).add(subscriptionList);
+            this.dependencies.get(0).add(channelSubscriptionList);
+            this.dependencies.get(0).add(brokerSubscriptionList);
             this.dependencies.get(1).add(functionAsPath);
         } else {
             this.dependencies = dependencies;
@@ -84,8 +89,12 @@
         return dependencies;
     }
 
-    public String getSubscriptionsDataset() {
-        return subscriptionsDatasetName;
+    public String getChannelSubscriptionsDataset() {
+        return channelSubscriptionsDatasetName;
+    }
+
+    public String getBrokerSubscriptionsDataset() {
+        return brokerSubscriptionsDatasetName;
     }
 
     public String getResultsDatasetName() {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index 175280e..6cf7f50 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -82,8 +82,9 @@
         String channelName =
                 ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX))
                         .getStringValue();
-        String subscriptionsName = ((AString) channelRecord
-                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX)).getStringValue();
+        String channelSubscriptionsName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX))
+                        .getStringValue();
         String resultsName =
                 ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX))
                         .getStringValue();
@@ -127,10 +128,15 @@
                 ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX))
                         .getStringValue();
 
+        String brokerSubscriptionsName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX))
+                        .getStringValue();
+
         FunctionSignature signature = new FunctionSignature(functionSignature.get(0), functionSignature.get(1),
                 Integer.parseInt(functionSignature.get(2)));
 
-        channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration,
+        channel = new Channel(dataverseName, channelName, channelSubscriptionsName, brokerSubscriptionsName,
+                resultsName, signature, duration,
                 dependencies, channelBody);
         return channel;
     }
@@ -164,9 +170,10 @@
 
         // write field 2
         fieldValue.reset();
-        aString.setValue(channel.getSubscriptionsDataset());
+        aString.setValue(channel.getChannelSubscriptionsDataset());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX, fieldValue);
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX,
+                fieldValue);
 
         // write field 3
         fieldValue.reset();
@@ -227,6 +234,13 @@
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX, fieldValue);
 
+        // write field 8
+        fieldValue.reset();
+        aString.setValue(channel.getBrokerSubscriptionsDataset());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX,
+                fieldValue);
+
         // write record
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
 
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 9ead7f0..e5e58af 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.FunctionInfo;
 import org.apache.asterix.om.types.IAType;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -46,6 +47,7 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -58,6 +60,8 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -76,25 +80,26 @@
         if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
             return false;
         }
-        boolean push = false;
 
-        AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
-            if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
-                return false;
-            }
+        boolean push = false;
+        AbstractLogicalOperator op = findOp(op1, 4, "", "");
+        if (op == null) {
             push = true;
         }
+
+
         DataSourceScanOperator subscriptionsScan;
         String channelDataverse;
         String channelName;
+        AssignOperator pushAssign = null;
+        AssignOperator newAssign = null;
+        GroupByOperator pushGroupBy = null;
+        LogicalVariable brokerSubsVar = null;
+        LogicalVariable brokerEndpoint = null;
+        LogicalVariable brokerSubId = null;
 
         if (!push) {
-            DelegateOperator eOp = (DelegateOperator) op;
-            if (!(eOp.getDelegate() instanceof CommitOperator)) {
-                return false;
-            }
-            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
             if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
                 return false;
             }
@@ -113,15 +118,34 @@
             //Now we know that we are inserting into results
 
             channelName = datasetName.substring(0, datasetName.length() - 7);
-            String subscriptionsName = channelName + "Subscriptions";
-            subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
+            String subscriptionsName = channelName + "ChannelSubscriptions";
+            subscriptionsScan =
+                    (DataSourceScanOperator) findOp(op, 3, subscriptionsName, BADConstants.ChannelSubscriptionsType);
             if (subscriptionsScan == null) {
                 return false;
             }
 
         } else {
+            op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+            if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
+                return false;
+            }
+            ProjectOperator pushProject = (ProjectOperator) op;
+
+            AbstractLogicalOperator op2 = (AbstractLogicalOperator) pushProject.getInputs().get(0).getValue();
+            if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+                return false;
+            }
+            pushAssign = (AssignOperator) op2;
+
+            AbstractLogicalOperator op3 = (AbstractLogicalOperator) pushAssign.getInputs().get(0).getValue();
+            if (op3.getOperatorTag() != LogicalOperatorTag.GROUP) {
+                return false;
+            }
+            pushGroupBy = (GroupByOperator) op3;
+
             //if push, get the channel name here instead
-            subscriptionsScan = (DataSourceScanOperator) findOp(op, "");
+            subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, "", BADConstants.ChannelSubscriptionsType);
             if (subscriptionsScan == null) {
                 return false;
             }
@@ -129,17 +153,8 @@
             String datasetName = dds.getDataset().getDatasetName();
             channelDataverse = dds.getDataset().getDataverseName();
             channelName = datasetName.substring(0, datasetName.length() - 13);
+            brokerEndpoint = pushGroupBy.getGroupByList().get(0).first;
         }
-
-        //Now we need to get the broker EndPoint
-        LogicalVariable brokerEndpointVar = context.newVar();
-        AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
-        if (opAboveBrokersScan == null) {
-            return false;
-        }
-
-        //get subscriptionIdVar
-        LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
 
         //The channelExecutionTime is created just before the scan
         ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
@@ -152,34 +167,85 @@
         }
 
         if (!push) {
-            ((CommitOperator) ((DelegateOperator) op).getDelegate()).setSink(false);
+            //move broker scan
+            SubplanOperator subplanOperator = (SubplanOperator) findOp(op, 1, "", "");
+            if (subplanOperator == null) {
+                return false;
+            }
+            brokerEndpoint = context.newVar();
+            brokerSubId = context.newVar();
+            brokerSubsVar = ((AggregateOperator) subplanOperator.getNestedPlans().get(0).getRoots().get(0).getValue())
+                    .getVariables().get(0);
+
+            newAssign = createAssignsAndUnnest(brokerSubsVar, brokerEndpoint, brokerSubId, op, context);
+
+            context.computeAndSetTypeEnvironmentForOperator(op1);
+        } else {
+            channelExecutionVar = pushGroupBy.getGroupByList().get(2).first;
         }
-
-        AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
-        //now brokerNameVar holds the brokerName for use farther up in the plan
-
-        context.computeAndSetTypeEnvironmentForOperator(assignOp);
-        context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
-        context.computeAndSetTypeEnvironmentForOperator(op);
-
-        ProjectOperator badProject = (ProjectOperator) findOp(op1, "project");
-        badProject.getVariables().add(subscriptionIdVar);
-        badProject.getVariables().add(brokerEndpointVar);
+        //Maybe we need to add a project???
+        ProjectOperator badProject = (ProjectOperator) findOp(op1, 2, "", "");
         badProject.getVariables().add(channelExecutionVar);
+        badProject.getVariables().add(push ? brokerEndpoint : brokerSubsVar);
         context.computeAndSetTypeEnvironmentForOperator(badProject);
-
 
         //Create my brokerNotify plan above the extension Operator
         DelegateOperator dOp = push
-                ? createNotifyBrokerPushPlan(brokerEndpointVar, badProject.getVariables().get(0), channelExecutionVar,
-                        context, op, (DistributeResultOperator) op1, channelDataverse, channelName)
-                : createNotifyBrokerPullPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, context, op,
+                ? createNotifyBrokerPushPlan(brokerEndpoint, channelExecutionVar, context, pushAssign, channelDataverse,
+                        channelName)
+                : createNotifyBrokerPullPlan(brokerEndpoint, brokerSubId, channelExecutionVar, context, newAssign,
                         (DistributeResultOperator) op1, channelDataverse, channelName);
 
         opRef.setValue(dOp);
 
         return true;
     }
+
+    private AssignOperator createAssignsAndUnnest(LogicalVariable brokerSubsVar, LogicalVariable brokerEndpoint,
+            LogicalVariable brokerSubId, AbstractLogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+
+        FunctionInfo finfoGetField =
+                (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME);
+        FunctionInfo finfoScanCollection =
+                (FunctionInfo) FunctionUtil.getFunctionInfo(BuiltinFunctions.SCAN_COLLECTION);
+
+        UnnestingFunctionCallExpression scanBrokerSubsVar = new UnnestingFunctionCallExpression(finfoScanCollection,
+                new MutableObject<>(new VariableReferenceExpression(brokerSubsVar)));
+        LogicalVariable unnestedBrokerSubsVar = context.newVar();
+        UnnestOperator unnestBrokerSubsVar =
+                new UnnestOperator(unnestedBrokerSubsVar, new MutableObject<>(scanBrokerSubsVar));
+        unnestBrokerSubsVar.getInputs().add(new MutableObject<>(op));
+
+
+        ScalarFunctionCallExpression getBrokerEndPoint = new ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new VariableReferenceExpression(unnestedBrokerSubsVar)), new MutableObject<>(
+                        new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint)))));
+
+        AssignOperator assignBrokerEndPoint =
+                new AssignOperator(brokerEndpoint, new MutableObject<>(getBrokerEndPoint));
+        assignBrokerEndPoint.getInputs().add(new MutableObject<>(unnestBrokerSubsVar));
+
+
+        ScalarFunctionCallExpression getBrokerSubId = new ScalarFunctionCallExpression(finfoGetField,
+                new MutableObject<>(new VariableReferenceExpression(unnestedBrokerSubsVar)),
+                new MutableObject<>(new ConstantExpression(
+                        new AsterixConstantValue(new AString(BADConstants.BrokerSubscriptionId)))));
+
+
+        AssignOperator assignBrokerSubId = new AssignOperator(brokerSubId, new MutableObject<>(getBrokerSubId));
+        assignBrokerSubId.getInputs().add(new MutableObject<>(assignBrokerEndPoint));
+
+
+        context.computeAndSetTypeEnvironmentForOperator(unnestBrokerSubsVar);
+        context.computeAndSetTypeEnvironmentForOperator(assignBrokerEndPoint);
+        context.computeAndSetTypeEnvironmentForOperator(assignBrokerSubId);
+
+        return assignBrokerSubId;
+
+    }
+
+
 
     private DelegateOperator createBrokerOp(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
             LogicalVariable channelExecutionVar, String channelDataverse, String channelName, boolean push,
@@ -194,23 +260,19 @@
         return extensionOp;
     }
 
-    private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable brokerEndpointVar, LogicalVariable sendVar,
-            LogicalVariable channelExecutionVar, IOptimizationContext context, ILogicalOperator eOp,
-            DistributeResultOperator distributeOp, String channelDataverse, String channelName)
+    private DelegateOperator createNotifyBrokerPushPlan(LogicalVariable brokerEndpointVar,
+            LogicalVariable channelExecutionVar, IOptimizationContext context, AssignOperator payLoadAssign,
+            String channelDataverse, String channelName)
             throws AlgebricksException {
-        //Find the assign operator to get the result type that we need
-        AbstractLogicalOperator assign = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
-        while (assign.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
-            assign = (AbstractLogicalOperator) assign.getInputs().get(0).getValue();
-        }
-        IVariableTypeEnvironment env = assign.computeOutputTypeEnvironment(context);
-        IAType resultType = (IAType) env.getVarType(sendVar);
+        IVariableTypeEnvironment env = payLoadAssign.computeOutputTypeEnvironment(context);
+        IAType resultType = (IAType) env.getVarType(payLoadAssign.getVariables().get(0));
 
         //Create the NotifyBrokerOperator
-        DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, sendVar, channelExecutionVar, channelDataverse,
+        DelegateOperator extensionOp = createBrokerOp(brokerEndpointVar, payLoadAssign.getVariables().get(0),
+                channelExecutionVar, channelDataverse,
                 channelName, true, resultType);
 
-        extensionOp.getInputs().add(new MutableObject<>(eOp));
+        extensionOp.getInputs().add(new MutableObject<>(payLoadAssign));
         context.computeAndSetTypeEnvironmentForOperator(extensionOp);
 
         return extensionOp;
@@ -277,82 +339,75 @@
 
     }
 
-    private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
-            AbstractLogicalOperator opAboveBrokersScan) {
-        Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
-                new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint))));
-        DataSourceScanOperator brokerScan = null;
-        int index = 0;
-        for (Mutable<ILogicalOperator> subOp : opAboveBrokersScan.getInputs()) {
-            if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
-                brokerScan = (DataSourceScanOperator) subOp.getValue();
-                break;
-            }
-            index++;
-        }
-        Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
-                new VariableReferenceExpression(brokerScan.getVariables().get(2)));
-
-        ScalarFunctionCallExpression fieldAccessByName = new ScalarFunctionCallExpression(
-                FunctionUtil.getFunctionInfo(BuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
-        ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(1);
-        varArray.add(brokerEndpointVar);
-        ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(1);
-        exprArray.add(new MutableObject<ILogicalExpression>(fieldAccessByName));
-
-        AssignOperator assignOp = new AssignOperator(varArray, exprArray);
-
-        //Place assignOp between the scan and the op above it
-        assignOp.getInputs().add(new MutableObject<ILogicalOperator>(brokerScan));
-        opAboveBrokersScan.getInputs().set(index, new MutableObject<ILogicalOperator>(assignOp));
-
-        return assignOp;
-    }
 
     /*This function is used to find specific operators within the plan, either
-     * A. The brokers dataset scan
-     * B. The subscriptions scan
-     * C. The highest project of the plan
+     * 1. The assign for the record being inserted
+     * 2. The highest project of the plan
+     * 3. The subscriptions scan
+     * 4. Commit operator
+     *
+     * param1 is the name of the expected subscriptions dataset when searching for a subscriptions scan
+     * and param2 is the type of the subscriptions dataset (channel or broker)
      */
-    private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
+    private AbstractLogicalOperator findOp(AbstractLogicalOperator op, int searchId, String param1, String param2) {
         if (!op.hasInputs()) {
             return null;
         }
         for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
-            if (lookingForString.equals("brokers")) {
-                if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
-                    return op;
+            if (searchId == 1) {
+                if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+                    if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+                        ScalarFunctionCallExpression resultCreation =
+                                (ScalarFunctionCallExpression) ((AssignOperator) op).getExpressions().get(0).getValue();
+
+                        resultCreation.getArguments().remove(resultCreation.getArguments().size() - 1);
+                        resultCreation.getArguments().remove(resultCreation.getArguments().size() - 1);
+                        return (AbstractLogicalOperator) subOp.getValue();
+                    }
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            lookingForString);
+                            searchId, param1, param2);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
                 }
 
-            } else if (lookingForString.equals("project")) {
+            } else if (searchId == 2) {
                 if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
                     return (AbstractLogicalOperator) subOp.getValue();
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            lookingForString);
+                            searchId, param1, param2);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
                 }
             }
 
-            else {
-                if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
+            else if (searchId == 3) {
+                if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), param1, param2)) {
                     return (AbstractLogicalOperator) subOp.getValue();
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            lookingForString);
+                            searchId, param1, param2);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
                 }
 
+            } else if (searchId == 4) {
+                if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+                    DelegateOperator dOp = (DelegateOperator) subOp.getValue();
+                    if (dOp.getDelegate() instanceof CommitOperator) {
+                        return (AbstractLogicalOperator) subOp.getValue();
+                    }
+                } else {
+                    AbstractLogicalOperator nestedOp =
+                            findOp((AbstractLogicalOperator) subOp.getValue(), searchId, param1, param2);
+                    if (nestedOp != null) {
+                        return nestedOp;
+                    }
+                }
             }
         }
         return null;
@@ -371,12 +426,12 @@
         return false;
     }
 
-    private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName) {
+    private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName, String subscriptionType) {
         if (op instanceof DataSourceScanOperator) {
             if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
                 DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
                 if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
-                        && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
+                        && dds.getDataset().getItemTypeName().equals(subscriptionType)) {
                     if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
                         return true;
                     }
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 7456cef..9e6491e 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -226,21 +226,40 @@
   String id = null;
   String subscriptionId = null;
   Pair<Identifier,Identifier> brokerName = null;
+  Token beginPos;
+  Token endPos;
+  String paramString;
+  List<String> paramList = new ArrayList<String>();
 }
 {
   (
   "subscribe" <TO> nameComponents = QualifiedName()
-   <LEFTPAREN> (tmp = Expression()
+   <LEFTPAREN>
+   (
    {
-      argList.add(tmp);
+   beginPos = token;
    }
-   (<COMMA> tmp = Expression()
+   tmp = Expression()
    {
       argList.add(tmp);
+      endPos = token;
+      paramString = extractFragment(beginPos.endLine, beginPos.endColumn, endPos.endLine, endPos.endColumn+1);
+      paramList.add(paramString);
+   }
+   (<COMMA>
+   {
+   beginPos = token;
+   }
+    tmp = Expression()
+   {
+      argList.add(tmp);
+      endPos = token;
+      paramString = extractFragment(beginPos.endLine, beginPos.endColumn, endPos.endLine, endPos.endColumn+1);
+      paramList.add(paramString);
    }
    )*)? <RIGHTPAREN> <ON> brokerName = QualifiedName()
    {
-      stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+      stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId, paramList);
    }
    | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
       {
@@ -251,19 +270,9 @@
         getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
         stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter());
       }
-     | "change" "subscription" subscriptionId = StringLiteral()  <ON> nameComponents = QualifiedName()
-       <LEFTPAREN> (tmp = Expression()
-       {
-         argList.add(tmp);
-       }
-       (<COMMA> tmp = Expression()
-       {
-         argList.add(tmp);
-       }
-       )*)? <RIGHTPAREN>
-        <TO> brokerName = QualifiedName()
+     | "move" "subscription" subscriptionId = StringLiteral()  <ON> nameComponents = QualifiedName() <TO> brokerName = QualifiedName()
       {
-        stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+        stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId, paramList);
       }
     )
     {
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
index 308c8b1..449a251 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
@@ -10,121 +10,162 @@
           -- HASH_PARTITION_EXCHANGE [$$141, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STABLE_SORT [$$116(ASC)]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$116]  |PARTITIONED|
+                -- STABLE_SORT [$$142(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$142]  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
-                      -- COMMIT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- INDEX_INSERT_DELETE  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- UNNEST  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- COMMIT  |PARTITIONED|
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- INSERT_DELETE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$111]  |PARTITIONED|
-                                        -- ASSIGN  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
+                                    -- INDEX_INSERT_DELETE  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- INSERT_DELETE  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$129]  |PARTITIONED|
                                                 -- ASSIGN  |PARTITIONED|
                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                     -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
                                                           -- STREAM_PROJECT  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- HYBRID_HASH_JOIN [$$126][$$125]  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$126]  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- PRE_CLUSTERED_GROUP_BY[$$169]  |PARTITIONED|
+                                                                          {
+                                                                            -- AGGREGATE  |LOCAL|
+                                                                              -- STREAM_SELECT  |LOCAL|
+                                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                          }
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- HYBRID_HASH_JOIN [$$124, $$122][$$117, $$118]  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$124, $$122]  |PARTITIONED|
+                                                                      -- STABLE_SORT [$$169(ASC)]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$169]  |PARTITIONED|
                                                                           -- STREAM_PROJECT  |PARTITIONED|
-                                                                            -- ASSIGN  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                                    -- ASSIGN  |UNPARTITIONED|
-                                                                                      -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$117, $$118]  |PARTITIONED|
-                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                            -- ASSIGN  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$125]  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- HYBRID_HASH_JOIN [$$119][$$135]  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- STREAM_SELECT  |PARTITIONED|
-                                                                            -- ASSIGN  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- BTREE_SEARCH  |PARTITIONED|
-                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- STABLE_SORT [$$145(ASC)]  |PARTITIONED|
-                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- HYBRID_HASH_JOIN [$$134][$$137]  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$134]  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- UNNEST  |PARTITIONED|
                                                                                         -- STREAM_PROJECT  |PARTITIONED|
                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                            -- BTREE_SEARCH  |PARTITIONED|
+                                                                                            -- PRE_CLUSTERED_GROUP_BY[$$166]  |PARTITIONED|
+                                                                                                    {
+                                                                                                      -- AGGREGATE  |LOCAL|
+                                                                                                        -- STREAM_SELECT  |LOCAL|
+                                                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                    }
                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                -- ASSIGN  |PARTITIONED|
-                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$135]  |PARTITIONED|
-                                                                          -- NESTED_LOOP  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- UNION_ALL  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- STREAM_SELECT  |PARTITIONED|
-                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                        -- BTREE_SEARCH  |PARTITIONED|
-                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                            -- SPLIT  |PARTITIONED|
-                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                    -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                -- STABLE_SORT [$$166(ASC)]  |PARTITIONED|
+                                                                                                  -- HASH_PARTITION_EXCHANGE [$$166]  |PARTITIONED|
+                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                        -- ASSIGN  |PARTITIONED|
-                                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- SPLIT  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                        -- HYBRID_HASH_JOIN [$$146][$$145]  |PARTITIONED|
+                                                                                                          -- HASH_PARTITION_EXCHANGE [$$146]  |PARTITIONED|
+                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- ASSIGN  |UNPARTITIONED|
+                                                                                                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                                                          -- HASH_PARTITION_EXCHANGE [$$145]  |PARTITIONED|
+                                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- HYBRID_HASH_JOIN [$$135][$$163]  |PARTITIONED|
+                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                  -- STABLE_SORT [$$174(ASC)]  |PARTITIONED|
+                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                          -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                    -- HASH_PARTITION_EXCHANGE [$$163]  |PARTITIONED|
+                                                                                                                      -- NESTED_LOOP  |PARTITIONED|
+                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                          -- UNION_ALL  |PARTITIONED|
+                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                    -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                        -- SPLIT  |PARTITIONED|
+                                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                  -- SPLIT  |PARTITIONED|
+                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                          -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                          -- UNION_ALL  |PARTITIONED|
+                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                      -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                          -- SPLIT  |PARTITIONED|
+                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                  -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                  -- SPLIT  |PARTITIONED|
+                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                          -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$137]  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- HYBRID_HASH_JOIN [$$149, $$151][$$139, $$140]  |PARTITIONED|
+                                                                                          -- HASH_PARTITION_EXCHANGE [$$149, $$151]  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
                                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- ASSIGN  |PARTITIONED|
-                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                              -- UNION_ALL  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- STREAM_SELECT  |PARTITIONED|
-                                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- BTREE_SEARCH  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- SPLIT  |PARTITIONED|
-                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                      -- BTREE_SEARCH  |PARTITIONED|
-                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                          -- ASSIGN  |PARTITIONED|
-                                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- SPLIT  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                          -- HASH_PARTITION_EXCHANGE [$$139, $$140]  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
                                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- ASSIGN  |PARTITIONED|
-                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
index 59805cc..bed547f 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
@@ -10,95 +10,136 @@
           -- HASH_PARTITION_EXCHANGE [$$141, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STABLE_SORT [$$116(ASC)]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$116]  |PARTITIONED|
+                -- STABLE_SORT [$$142(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$142]  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
-                      -- COMMIT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- INDEX_INSERT_DELETE  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- UNNEST  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- COMMIT  |PARTITIONED|
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- INSERT_DELETE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$111]  |PARTITIONED|
-                                        -- ASSIGN  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
+                                    -- INDEX_INSERT_DELETE  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- INSERT_DELETE  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$129]  |PARTITIONED|
                                                 -- ASSIGN  |PARTITIONED|
                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                     -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
                                                           -- STREAM_PROJECT  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- HYBRID_HASH_JOIN [$$126][$$125]  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$126]  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- PRE_CLUSTERED_GROUP_BY[$$169]  |PARTITIONED|
+                                                                          {
+                                                                            -- AGGREGATE  |LOCAL|
+                                                                              -- STREAM_SELECT  |LOCAL|
+                                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                          }
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- HYBRID_HASH_JOIN [$$124, $$122][$$117, $$118]  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$124, $$122]  |PARTITIONED|
+                                                                      -- STABLE_SORT [$$169(ASC)]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$169]  |PARTITIONED|
                                                                           -- STREAM_PROJECT  |PARTITIONED|
-                                                                            -- ASSIGN  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                                    -- ASSIGN  |UNPARTITIONED|
-                                                                                      -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$117, $$118]  |PARTITIONED|
-                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                            -- ASSIGN  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$125]  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- HYBRID_HASH_JOIN [$$119][$$135]  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- STREAM_SELECT  |PARTITIONED|
-                                                                            -- ASSIGN  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- BTREE_SEARCH  |PARTITIONED|
-                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- STABLE_SORT [$$145(ASC)]  |PARTITIONED|
-                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- HYBRID_HASH_JOIN [$$134][$$137]  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$134]  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- UNNEST  |PARTITIONED|
                                                                                         -- STREAM_PROJECT  |PARTITIONED|
                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                            -- BTREE_SEARCH  |PARTITIONED|
+                                                                                            -- PRE_CLUSTERED_GROUP_BY[$$166]  |PARTITIONED|
+                                                                                                    {
+                                                                                                      -- AGGREGATE  |LOCAL|
+                                                                                                        -- STREAM_SELECT  |LOCAL|
+                                                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                    }
                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                -- ASSIGN  |PARTITIONED|
-                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$135]  |PARTITIONED|
-                                                                          -- NESTED_LOOP  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                                -- STREAM_SELECT  |PARTITIONED|
-                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                -- STABLE_SORT [$$166(ASC)]  |PARTITIONED|
+                                                                                                  -- HASH_PARTITION_EXCHANGE [$$166]  |PARTITIONED|
+                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                        -- HYBRID_HASH_JOIN [$$146][$$145]  |PARTITIONED|
+                                                                                                          -- HASH_PARTITION_EXCHANGE [$$146]  |PARTITIONED|
+                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- ASSIGN  |UNPARTITIONED|
+                                                                                                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                                                          -- HASH_PARTITION_EXCHANGE [$$145]  |PARTITIONED|
+                                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- HYBRID_HASH_JOIN [$$135][$$163]  |PARTITIONED|
+                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                  -- STABLE_SORT [$$174(ASC)]  |PARTITIONED|
+                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                          -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                    -- HASH_PARTITION_EXCHANGE [$$163]  |PARTITIONED|
+                                                                                                                      -- NESTED_LOOP  |PARTITIONED|
+                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                            -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                    -- STABLE_SORT [$$178(ASC)]  |PARTITIONED|
+                                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                            -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                            -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                  -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                      -- STABLE_SORT [$$181(ASC)]  |PARTITIONED|
+                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$137]  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                        -- STABLE_SORT [$$149(ASC)]  |PARTITIONED|
-                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- HYBRID_HASH_JOIN [$$149, $$151][$$139, $$140]  |PARTITIONED|
+                                                                                          -- HASH_PARTITION_EXCHANGE [$$149, $$151]  |PARTITIONED|
                                                                                             -- STREAM_PROJECT  |PARTITIONED|
-                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                -- BTREE_SEARCH  |PARTITIONED|
-                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                    -- ASSIGN  |PARTITIONED|
-                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                                -- STREAM_SELECT  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- BTREE_SEARCH  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- STABLE_SORT [$$152(ASC)]  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
                                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                      -- ASSIGN  |PARTITIONED|
-                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                          -- HASH_PARTITION_EXCHANGE [$$139, $$140]  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
index a080c79..98f19ee 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
@@ -1,61 +1,102 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$94, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$103, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$94(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$94, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$103(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$103, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STABLE_SORT [$$79(ASC)]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
+                -- STABLE_SORT [$$104(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$104]  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
-                      -- COMMIT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- INDEX_INSERT_DELETE  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- UNNEST  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- COMMIT  |PARTITIONED|
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- INSERT_DELETE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$73]  |PARTITIONED|
-                                        -- ASSIGN  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
+                                    -- INDEX_INSERT_DELETE  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- INSERT_DELETE  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$91]  |PARTITIONED|
                                                 -- ASSIGN  |PARTITIONED|
                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                     -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- NESTED_LOOP  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- HYBRID_HASH_JOIN [$$86, $$84][$$80, $$81]  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$86, $$84]  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- ASSIGN  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                                -- ASSIGN  |UNPARTITIONED|
-                                                                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$80, $$81]  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- ASSIGN  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- PRE_CLUSTERED_GROUP_BY[$$122]  |PARTITIONED|
+                                                                          {
+                                                                            -- AGGREGATE  |LOCAL|
+                                                                              -- STREAM_SELECT  |LOCAL|
+                                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                          }
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                      -- STABLE_SORT [$$122(ASC)]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$122]  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- HYBRID_HASH_JOIN [$$97][$$99]  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$97]  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- UNNEST  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- PRE_CLUSTERED_GROUP_BY[$$119]  |PARTITIONED|
+                                                                                                    {
+                                                                                                      -- AGGREGATE  |LOCAL|
+                                                                                                        -- STREAM_SELECT  |LOCAL|
+                                                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                    }
+                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                -- STABLE_SORT [$$119(ASC)]  |PARTITIONED|
+                                                                                                  -- HASH_PARTITION_EXCHANGE [$$119]  |PARTITIONED|
+                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                        -- NESTED_LOOP  |PARTITIONED|
+                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- ASSIGN  |UNPARTITIONED|
+                                                                                                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                                                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$99]  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- HYBRID_HASH_JOIN [$$109, $$111][$$101, $$102]  |PARTITIONED|
+                                                                                          -- HASH_PARTITION_EXCHANGE [$$109, $$111]  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                          -- HASH_PARTITION_EXCHANGE [$$101, $$102]  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
index 770617f..8b0e599 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
@@ -1,64 +1,114 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- STREAM_PROJECT  |PARTITIONED|
-    -- STREAM_PROJECT  |PARTITIONED|
-      -- ASSIGN  |PARTITIONED|
-        -- STREAM_PROJECT  |PARTITIONED|
-          -- STREAM_SELECT  |PARTITIONED|
-            -- STREAM_PROJECT  |PARTITIONED|
-              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- HYBRID_HASH_JOIN [$$128][$$135]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$128]  |PARTITIONED|
+    -- ASSIGN  |PARTITIONED|
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- PRE_CLUSTERED_GROUP_BY[$$195, $$143, $$channelExecutionTime]  |PARTITIONED|
+                {
+                  -- AGGREGATE  |LOCAL|
+                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                }
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- STABLE_SORT [$$195(ASC), $$143(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+              -- HASH_PARTITION_EXCHANGE [$$195, $$143, $$channelExecutionTime]  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- HYBRID_HASH_JOIN [$$126, $$124][$$117, $$118]  |PARTITIONED|
-                          -- HASH_PARTITION_EXCHANGE [$$126, $$124]  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- DATASOURCE_SCAN  |PARTITIONED|
-                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                      -- ASSIGN  |UNPARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                          -- HASH_PARTITION_EXCHANGE [$$117, $$118]  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- DATASOURCE_SCAN  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$135]  |PARTITIONED|
-                    -- NESTED_LOOP  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- STREAM_SELECT  |PARTITIONED|
-                          -- ASSIGN  |PARTITIONED|
+                        -- HYBRID_HASH_JOIN [$$183, $$185][$$181, $$182]  |PARTITIONED|
+                          -- HASH_PARTITION_EXCHANGE [$$183, $$185]  |PARTITIONED|
                             -- STREAM_PROJECT  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                      -- BROADCAST_EXCHANGE  |PARTITIONED|
-                        -- PRE_CLUSTERED_GROUP_BY[$$120]  |PARTITIONED|
-                                {
-                                  -- AGGREGATE  |LOCAL|
-                                    -- STREAM_SELECT  |LOCAL|
-                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                }
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- STABLE_SORT [$$120(ASC)]  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$120]  |PARTITIONED|
-                                -- NESTED_LOOP  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- HYBRID_HASH_JOIN [$$175][$$179]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$175]  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- STREAM_SELECT  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- HYBRID_HASH_JOIN [$$188][$$199]  |PARTITIONED|
+                                                  -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                              -- ASSIGN  |UNPARTITIONED|
+                                                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                  -- HASH_PARTITION_EXCHANGE [$$199]  |PARTITIONED|
+                                                    -- NESTED_LOOP  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- PRE_CLUSTERED_GROUP_BY[$$231]  |PARTITIONED|
+                                                                    {
+                                                                      -- AGGREGATE  |LOCAL|
+                                                                        -- STREAM_SELECT  |LOCAL|
+                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                    }
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STABLE_SORT [$$231(ASC)]  |PARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$231]  |PARTITIONED|
+                                                                    -- UNION_ALL  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- STREAM_SELECT  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- BTREE_SEARCH  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- SPLIT  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- RTREE_SEARCH  |PARTITIONED|
+                                                                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                        -- ASSIGN  |PARTITIONED|
+                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- STREAM_SELECT  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- SPLIT  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- RTREE_SEARCH  |PARTITIONED|
+                                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$179]  |PARTITIONED|
                                     -- STREAM_PROJECT  |PARTITIONED|
                                       -- ASSIGN  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           -- DATASOURCE_SCAN  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                               -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                    -- STREAM_PROJECT  |PARTITIONED|
-                                      -- ASSIGN  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                          -- HASH_PARTITION_EXCHANGE [$$181, $$182]  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
index 19dac15..7446647 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
@@ -1,75 +1,193 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$94, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$103, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$94(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$94, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$103(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$103, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STABLE_SORT [$$79(ASC)]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
+                -- STABLE_SORT [$$104(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$104]  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
-                      -- COMMIT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- INDEX_INSERT_DELETE  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- UNNEST  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- COMMIT  |PARTITIONED|
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- INSERT_DELETE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$73]  |PARTITIONED|
-                                        -- ASSIGN  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
+                                    -- INDEX_INSERT_DELETE  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- INSERT_DELETE  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$91]  |PARTITIONED|
                                                 -- ASSIGN  |PARTITIONED|
                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                     -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- NESTED_LOOP  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- HYBRID_HASH_JOIN [$$86, $$84][$$80, $$81]  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$86, $$84]  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- ASSIGN  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                                -- ASSIGN  |UNPARTITIONED|
-                                                                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$80, $$81]  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- ASSIGN  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- PRE_CLUSTERED_GROUP_BY[$$122]  |PARTITIONED|
+                                                                          {
+                                                                            -- AGGREGATE  |LOCAL|
+                                                                              -- STREAM_SELECT  |LOCAL|
+                                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                          }
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                      -- STABLE_SORT [$$122(ASC)]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$122]  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- HYBRID_HASH_JOIN [$$97][$$99]  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$97]  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- UNNEST  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- PRE_CLUSTERED_GROUP_BY[$$119]  |PARTITIONED|
+                                                                                                    {
+                                                                                                      -- AGGREGATE  |LOCAL|
+                                                                                                        -- STREAM_SELECT  |LOCAL|
+                                                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                    }
+                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                -- STABLE_SORT [$$119(ASC)]  |PARTITIONED|
+                                                                                                  -- HASH_PARTITION_EXCHANGE [$$119]  |PARTITIONED|
+                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                        -- NESTED_LOOP  |PARTITIONED|
+                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- ASSIGN  |UNPARTITIONED|
+                                                                                                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                                                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$99]  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- HYBRID_HASH_JOIN [$$109, $$111][$$101, $$102]  |PARTITIONED|
+                                                                                          -- HASH_PARTITION_EXCHANGE [$$109, $$111]  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                          -- HASH_PARTITION_EXCHANGE [$$101, $$102]  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+-- COMMIT  |LOCAL|
+  -- STREAM_PROJECT  |LOCAL|
+    -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+      -- INSERT_DELETE  |LOCAL|
+        -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+          -- MATERIALIZE  |LOCAL|
+            -- HASH_PARTITION_EXCHANGE [$$52]  |LOCAL|
+              -- ASSIGN  |LOCAL|
+                -- STREAM_PROJECT  |LOCAL|
+                  -- ASSIGN  |LOCAL|
+                    -- STREAM_PROJECT  |LOCAL|
+                      -- UNNEST  |LOCAL|
+                        -- STREAM_PROJECT  |LOCAL|
+                          -- ASSIGN  |LOCAL|
+                            -- SUBPLAN  |LOCAL|
+                                    {
+                                      -- AGGREGATE  |LOCAL|
+                                        -- ASSIGN  |LOCAL|
+                                          -- STREAM_SELECT  |LOCAL|
+                                            -- ASSIGN  |LOCAL|
+                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                    }
+                              -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                                -- PRE_CLUSTERED_GROUP_BY[$$55]  |LOCAL|
+                                        {
+                                          -- AGGREGATE  |LOCAL|
+                                            -- STREAM_SELECT  |LOCAL|
+                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                        }
+                                  -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                                    -- STABLE_SORT [$$55(ASC)]  |LOCAL|
+                                      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                        -- STREAM_PROJECT  |UNPARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                            -- HYBRID_HASH_JOIN [$$55][$$64]  |UNPARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                -- AGGREGATE  |UNPARTITIONED|
+                                                  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                    -- AGGREGATE  |PARTITIONED|
+                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                -- NESTED_LOOP  |UNPARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                    -- ASSIGN  |UNPARTITIONED|
+                                                      -- STREAM_SELECT  |UNPARTITIONED|
+                                                        -- AGGREGATE  |UNPARTITIONED|
+                                                          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                            -- AGGREGATE  |PARTITIONED|
+                                                              -- STREAM_SELECT  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                    -- STREAM_PROJECT  |UNPARTITIONED|
+                                                      -- ASSIGN  |UNPARTITIONED|
+                                                        -- AGGREGATE  |UNPARTITIONED|
+                                                          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_SELECT  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- COMMIT  |PARTITIONED|
-      -- STREAM_PROJECT  |PARTITIONED|
-        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-          -- INSERT_DELETE  |PARTITIONED|
-            -- HASH_PARTITION_EXCHANGE [$$12]  |PARTITIONED|
-              -- ASSIGN  |UNPARTITIONED|
-                -- STREAM_PROJECT  |UNPARTITIONED|
-                  -- ASSIGN  |UNPARTITIONED|
-                    -- STREAM_PROJECT  |UNPARTITIONED|
-                      -- ASSIGN  |UNPARTITIONED|
-                        -- ASSIGN  |UNPARTITIONED|
-                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- COMMIT  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- INSERT_DELETE  |PARTITIONED|
+              -- HASH_PARTITION_EXCHANGE [$$28, $$29]  |PARTITIONED|
+                -- ASSIGN  |UNPARTITIONED|
+                  -- STREAM_PROJECT  |UNPARTITIONED|
+                    -- ASSIGN  |UNPARTITIONED|
+                      -- STREAM_LIMIT  |UNPARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- SORT_MERGE_EXCHANGE [$$34(ASC) ]  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- STREAM_LIMIT  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
index a847ef0..709ed35 100644
--- a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
+++ b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
@@ -1,64 +1,105 @@
 -- NOTIFY_BROKERS  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- PRE_CLUSTERED_GROUP_BY[$$94, $$channelExecutionTime]  |PARTITIONED|
+    -- PRE_CLUSTERED_GROUP_BY[$$103, $$channelExecutionTime]  |PARTITIONED|
             {
               -- AGGREGATE  |LOCAL|
                 -- NESTED_TUPLE_SOURCE  |LOCAL|
             }
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-        -- STABLE_SORT [$$94(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
-          -- HASH_PARTITION_EXCHANGE [$$94, $$channelExecutionTime]  |PARTITIONED|
+        -- STABLE_SORT [$$103(ASC), $$channelExecutionTime(ASC)]  |PARTITIONED|
+          -- HASH_PARTITION_EXCHANGE [$$103, $$channelExecutionTime]  |PARTITIONED|
             -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- STABLE_SORT [$$79(ASC)]  |PARTITIONED|
-                  -- HASH_PARTITION_EXCHANGE [$$79]  |PARTITIONED|
+                -- STABLE_SORT [$$104(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$104]  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
-                      -- COMMIT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- INDEX_INSERT_DELETE  |PARTITIONED|
-                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- UNNEST  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- COMMIT  |PARTITIONED|
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- INSERT_DELETE  |PARTITIONED|
-                                      -- HASH_PARTITION_EXCHANGE [$$73]  |PARTITIONED|
-                                        -- ASSIGN  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
+                                    -- INDEX_INSERT_DELETE  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- INSERT_DELETE  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$91]  |PARTITIONED|
                                                 -- ASSIGN  |PARTITIONED|
                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                     -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- NESTED_LOOP  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- HYBRID_HASH_JOIN [$$86, $$84][$$80, $$81]  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$86, $$84]  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- ASSIGN  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                                -- ASSIGN  |UNPARTITIONED|
-                                                                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$80, $$81]  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- ASSIGN  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- PRE_CLUSTERED_GROUP_BY[$$122]  |PARTITIONED|
+                                                                          {
+                                                                            -- AGGREGATE  |LOCAL|
+                                                                              -- STREAM_SELECT  |LOCAL|
+                                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                          }
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                      -- STABLE_SORT [$$122(ASC)]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$122]  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- HYBRID_HASH_JOIN [$$97][$$99]  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$97]  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- UNNEST  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- PRE_CLUSTERED_GROUP_BY[$$119]  |PARTITIONED|
+                                                                                                    {
+                                                                                                      -- AGGREGATE  |LOCAL|
+                                                                                                        -- STREAM_SELECT  |LOCAL|
+                                                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                    }
+                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                -- STABLE_SORT [$$119(ASC)]  |PARTITIONED|
+                                                                                                  -- HASH_PARTITION_EXCHANGE [$$119]  |PARTITIONED|
+                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                        -- NESTED_LOOP  |PARTITIONED|
+                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- ASSIGN  |UNPARTITIONED|
+                                                                                                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                                                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$99]  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- HYBRID_HASH_JOIN [$$109, $$111][$$101, $$102]  |PARTITIONED|
+                                                                                          -- HASH_PARTITION_EXCHANGE [$$109, $$111]  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                          -- HASH_PARTITION_EXCHANGE [$$101, $$102]  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
 -- COMMIT  |PARTITIONED|
   -- STREAM_PROJECT  |PARTITIONED|
     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -68,8 +109,9 @@
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
               -- ASSIGN  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- BTREE_SEARCH  |PARTITIONED|
+                  -- STREAM_SELECT  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- ASSIGN  |PARTITIONED|
-                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- DATASOURCE_SCAN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
index 0a38e41..8d43491 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
@@ -25,110 +25,106 @@
 
 use channels;
 
-insert into EmergencyChannelSubscriptions(
-[
-{"param0" : "w2294u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4321u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3398u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "w2488u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3666u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4489u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p78u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p544u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p711u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t2828u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4796u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4082u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4923u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "w2324u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1339u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p520u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1092u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4979u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1487u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4330u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3682u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p117u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "w1741u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "w2434u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3833u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1373u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p89u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4003u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c910u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4961u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4475u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "w1960u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p438u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1362u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p588u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c902u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4684u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1609u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1510u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3851u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1418u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t2559u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "w1815u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4924u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3320u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p663u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4571u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p781u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c919u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1121u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p814u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4006u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t2822u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4953u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3486u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3107u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t2836u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "w2003u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3256u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4762u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4900u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p357u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3630u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3166u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4687u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p817u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4433u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3426u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p582u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3388u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4823u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1664u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4051u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c857u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1412u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t2521u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3114u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p404u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p111u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3006u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t2903u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t2823u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4153u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t2589u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1459u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p766u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p593u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p168u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4253u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4177u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p387u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t2571u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "c1513u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p618u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t2735u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t4859u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "w1848u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t3306u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "t2558u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" },
-{"param0" : "p180u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" }
-]
-);
+subscribe to EmergencyChannel("w2294u1") on brokerA;
+subscribe to EmergencyChannel("t4321u1") on brokerA;
+subscribe to EmergencyChannel("t3398u1") on brokerA;
+subscribe to EmergencyChannel("w2488u1") on brokerA;
+subscribe to EmergencyChannel("t3666u1") on brokerA;
+subscribe to EmergencyChannel("t4489u1") on brokerA;
+subscribe to EmergencyChannel("p78u1") on brokerA;
+subscribe to EmergencyChannel("p544u1") on brokerA;
+subscribe to EmergencyChannel("p711u1") on brokerA;
+subscribe to EmergencyChannel("t2828u1") on brokerA;
+subscribe to EmergencyChannel("t4796u1") on brokerA;
+subscribe to EmergencyChannel("t4082u1") on brokerA;
+subscribe to EmergencyChannel("t4923u1") on brokerA;
+subscribe to EmergencyChannel("w2324u1") on brokerA;
+subscribe to EmergencyChannel("c1339u1") on brokerA;
+subscribe to EmergencyChannel("p520u1") on brokerA;
+subscribe to EmergencyChannel("c1092u1") on brokerA;
+subscribe to EmergencyChannel("t4979u1") on brokerA;
+subscribe to EmergencyChannel("c1487u1") on brokerA;
+subscribe to EmergencyChannel("t4330u1") on brokerA;
+subscribe to EmergencyChannel("t3682u1") on brokerA;
+subscribe to EmergencyChannel("p117u1") on brokerA;
+subscribe to EmergencyChannel("w1741u1") on brokerA;
+subscribe to EmergencyChannel("w2434u1") on brokerA;
+subscribe to EmergencyChannel("t3833u1") on brokerA;
+subscribe to EmergencyChannel("c1373u1") on brokerA;
+subscribe to EmergencyChannel("p89u1") on brokerA;
+subscribe to EmergencyChannel("t4003u1") on brokerA;
+subscribe to EmergencyChannel("c910u1") on brokerA;
+subscribe to EmergencyChannel("t4961u1") on brokerA;
+subscribe to EmergencyChannel("t4475u1") on brokerA;
+subscribe to EmergencyChannel("w1960u1") on brokerA;
+subscribe to EmergencyChannel("p438u1") on brokerA;
+subscribe to EmergencyChannel("c1362u1") on brokerA;
+subscribe to EmergencyChannel("p588u1") on brokerA;
+subscribe to EmergencyChannel("c902u1") on brokerA;
+subscribe to EmergencyChannel("t4684u1") on brokerA;
+subscribe to EmergencyChannel("c1609u1") on brokerA;
+subscribe to EmergencyChannel("c1510u1") on brokerA;
+subscribe to EmergencyChannel("t3851u1") on brokerA;
+subscribe to EmergencyChannel("c1418u1") on brokerA;
+subscribe to EmergencyChannel("t2559u1") on brokerA;
+subscribe to EmergencyChannel("w1815u1") on brokerA;
+subscribe to EmergencyChannel("t4924u1") on brokerA;
+subscribe to EmergencyChannel("t3320u1") on brokerA;
+subscribe to EmergencyChannel("p663u1") on brokerA;
+subscribe to EmergencyChannel("t4571u1") on brokerA;
+subscribe to EmergencyChannel("p781u1") on brokerA;
+subscribe to EmergencyChannel("c919u1") on brokerA;
+subscribe to EmergencyChannel("c1121u1") on brokerA;
+subscribe to EmergencyChannel("p814u1") on brokerA;
+subscribe to EmergencyChannel("t4006u1") on brokerA;
+subscribe to EmergencyChannel("t2822u1") on brokerA;
+subscribe to EmergencyChannel("t4953u1") on brokerA;
+subscribe to EmergencyChannel("t3486u1") on brokerA;
+subscribe to EmergencyChannel("t3107u1") on brokerA;
+subscribe to EmergencyChannel("t2836u1") on brokerA;
+subscribe to EmergencyChannel("w2003u1") on brokerA;
+subscribe to EmergencyChannel("t3256u1") on brokerA;
+subscribe to EmergencyChannel("t4762u1") on brokerA;
+subscribe to EmergencyChannel("t4900u1") on brokerA;
+subscribe to EmergencyChannel("p357u1") on brokerA;
+subscribe to EmergencyChannel("t3630u1") on brokerA;
+subscribe to EmergencyChannel("t3166u1") on brokerA;
+subscribe to EmergencyChannel("t4687u1") on brokerA;
+subscribe to EmergencyChannel("p817u1") on brokerA;
+subscribe to EmergencyChannel("t4433u1") on brokerA;
+subscribe to EmergencyChannel("t3426u1") on brokerA;
+subscribe to EmergencyChannel("p582u1") on brokerA;
+subscribe to EmergencyChannel("t3388u1") on brokerA;
+subscribe to EmergencyChannel("t4823u1") on brokerA;
+subscribe to EmergencyChannel("c1664u1") on brokerA;
+subscribe to EmergencyChannel("t4051u1") on brokerA;
+subscribe to EmergencyChannel("c857u1") on brokerA;
+subscribe to EmergencyChannel("c1412u1") on brokerA;
+subscribe to EmergencyChannel("t2521u1") on brokerA;
+subscribe to EmergencyChannel("t3114u1") on brokerA;
+subscribe to EmergencyChannel("p404u1") on brokerA;
+subscribe to EmergencyChannel("p111u1") on brokerA;
+subscribe to EmergencyChannel("t3006u1") on brokerA;
+subscribe to EmergencyChannel("t2903u1") on brokerA;
+subscribe to EmergencyChannel("t2823u1") on brokerA;
+subscribe to EmergencyChannel("t4153u1") on brokerA;
+subscribe to EmergencyChannel("t2589u1") on brokerA;
+subscribe to EmergencyChannel("c1459u1") on brokerA;
+subscribe to EmergencyChannel("p766u1") on brokerA;
+subscribe to EmergencyChannel("p593u1") on brokerA;
+subscribe to EmergencyChannel("p168u1") on brokerA;
+subscribe to EmergencyChannel("t4253u1") on brokerA;
+subscribe to EmergencyChannel("t4177u1") on brokerA;
+subscribe to EmergencyChannel("p387u1") on brokerA;
+subscribe to EmergencyChannel("t2571u1") on brokerA;
+subscribe to EmergencyChannel("c1513u1") on brokerA;
+subscribe to EmergencyChannel("p618u1") on brokerA;
+subscribe to EmergencyChannel("t2735u1") on brokerA;
+subscribe to EmergencyChannel("t4859u1") on brokerA;
+subscribe to EmergencyChannel("w1848u1") on brokerA;
+subscribe to EmergencyChannel("t3306u1") on brokerA;
+subscribe to EmergencyChannel("t2558u1") on brokerA;
+subscribe to EmergencyChannel("p180u1") on brokerA;
 
 insert into Shelters (
 [
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.pollquery.sqlpp
similarity index 97%
rename from asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.query.sqlpp
rename to asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.pollquery.sqlpp
index dd6e1ca..0cc76dc 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.query.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.pollquery.sqlpp
@@ -22,6 +22,7 @@
 * Date         : Apr 2018
 * Author       : Steven Jacobs
 */
+// polltimeoutsecs=15
 
 use channels;
 
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.sqlpp
index c9ac4c8..f93a5cf 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.sqlpp
@@ -21,6 +21,7 @@
 select value x
 from Metadata.`Channel` result, Metadata.`Dataset` x
 where result.ChannelName = "nearbyTweetChannel"
-and (x.DatasetName = result.SubscriptionsDatasetName
+and (x.DatasetName = result.ChannelSubscriptionsDatasetName
+or x.DatasetName = result.BrokerSubscriptionsDatasetName
 or x.DatasetName = result.ResultsDatasetName)
 order by x.DatasetName;
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.sqlpp
index 7a1c5a1..52af22d 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.sqlpp
@@ -20,7 +20,8 @@
 
 select value x
 from Metadata.`Channel` result, Metadata.`Dataset` x
-where x.DatasetName = result.SubscriptionsDatasetName
+where x.DatasetName = result.BrokerSubscriptionsDatasetName
+or x.DatasetName = result.ChannelSubscriptionsDatasetName
 or x.DatasetName = result.ResultsDatasetName
 order by x.DatasetName
 ;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp
index f6dc2bf..13e644e 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp
@@ -54,4 +54,4 @@
 
 create repetitive channel nearbyTweetChannel using channels.NearbyTweetsContainingText@2 period duration("PT10M");
 
-drop dataset two.nearbyTweetChannelSubscriptions;
\ No newline at end of file
+drop dataset two.nearbyTweetChannelChannelSubscriptions;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp
index 8612077..1044c26 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp
@@ -25,7 +25,7 @@
 
 use channels;
 
-select test.BrokerName, test.param0
-from roomRecordsSubscriptions test
-order by test.BrokerName
+select test.param0
+from roomRecordsChannelSubscriptions test
+order by test.param0
 ;
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.1.ddl.sqlpp
new file mode 100644
index 0000000..06f1b4e
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.1.ddl.sqlpp
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Create matching subscriptions and check results
+* Expected Res : Success
+* Date         : Jun 2018
+* Author       : Steven Jacobs
+*/
+
+drop dataverse two if exists;
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type userLocation as {
+  userId: int,
+  roomNumber: int
+};
+create type watchedUser as {
+  userId: int,
+  name: string
+};
+create type roomSecurity as {
+  roomNumber: int,
+  securityGuardName: string,
+  securityGuardNumber: string
+};
+
+create dataset watchedUsers(watchedUser)
+primary key userId;
+
+create dataset roomSecurityAssignments(roomSecurity)
+primary key roomNumber;
+
+upsert into roomSecurityAssignments([
+{"roomNumber":123, "securityGuardName":"Mike", "securityGuardNumber":"555_4815"},
+{"roomNumber":222, "securityGuardName":"Steven", "securityGuardNumber":"555_1623"},
+{"roomNumber":350, "securityGuardName":"Vassilis", "securityGuardNumber":"555_1234"}]
+);
+
+upsert into watchedUsers([
+{"userId":1, "name":"suspectNumber1"}]
+);
+
+
+create dataset UserLocations(userLocation)
+primary key userId;
+
+create function RoomOccupants(room) {
+  (select location.userId
+  from UserLocations location
+  where location.roomNumber = room)
+};
+
+create broker brokerA at "http://www.notifyA.com";
+create broker brokerB at "http://www.notifyB.com";
+
+create repetitive channel roomRecords using RoomOccupants@1 period duration("PT5S");
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.2.update.sqlpp
similarity index 81%
copy from asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
copy to asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.2.update.sqlpp
index 5f764b3..15a2246 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.2.update.sqlpp
@@ -16,11 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
-* Description  : Check Whether a Channel works after adding a new Index
-* Expected Res : Success
-* Date         : Apr 2018
-* Author       : Steven Jacobs
-*/
+use channels;
 
-15000
\ No newline at end of file
+subscribe to roomRecords(123) on brokerA;
+subscribe to roomRecords(350) on brokerA;
+subscribe to roomRecords(123) on brokerA;
+subscribe to roomRecords(350) on brokerB;
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp
similarity index 82%
copy from asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
copy to asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp
index 5f764b3..c3a8dee 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp
@@ -16,11 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
-* Description  : Check Whether a Channel works after adding a new Index
-* Expected Res : Success
-* Date         : Apr 2018
-* Author       : Steven Jacobs
-*/
+use channels;
 
-15000
\ No newline at end of file
+select param0 from roomRecordsChannelSubscriptions
+order by param0;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.4.query.sqlpp
similarity index 82%
rename from asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
rename to asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.4.query.sqlpp
index 5f764b3..17c0a01 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.4.query.sqlpp
@@ -16,11 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
-* Description  : Check Whether a Channel works after adding a new Index
-* Expected Res : Success
-* Date         : Apr 2018
-* Author       : Steven Jacobs
-*/
+use channels;
 
-15000
\ No newline at end of file
+select DataverseName,BrokerName from roomRecordsBrokerSubscriptions
+order by BrokerName;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.5.update.sqlpp
similarity index 82%
copy from asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
copy to asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.5.update.sqlpp
index 5f764b3..7c14a29 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.5.update.sqlpp
@@ -16,11 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
-* Description  : Check Whether a Channel works after adding a new Index
-* Expected Res : Success
-* Date         : Apr 2018
-* Author       : Steven Jacobs
-*/
+use channels;
 
-15000
\ No newline at end of file
+upsert into UserLocations([
+{"userId":1, "roomNumber":123},
+{"userId":2, "roomNumber":222},
+{"userId":3, "roomNumber":350}]
+);
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp
similarity index 82%
copy from asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
copy to asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp
index 5f764b3..990002a 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp
@@ -16,11 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
-* Description  : Check Whether a Channel works after adding a new Index
-* Expected Res : Success
-* Date         : Apr 2018
-* Author       : Steven Jacobs
-*/
+ // polltimeoutsecs=15
 
-15000
\ No newline at end of file
+use channels;
+
+array_count((select * from roomRecordsResults));
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.sqlpp
index cd0510d..5c3a4de 100644
--- a/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.sqlpp
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.sqlpp
@@ -19,6 +19,6 @@
 use channels;
 
 select value param1
-from nearbyTweetChannelSubscriptions
+from nearbyTweetChannelChannelSubscriptions
 order by param1
 ;
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
index 776192d..59442c8 100644
--- a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
@@ -1,2 +1,3 @@
-{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannelResults", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannelResults", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Mon Sep 12 13:48:16 PDT 2016", "DatasetId": 103, "PendingOp": 0 }
-{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannelSubscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannelSubscriptions", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "subscriptionId" ] ], "PrimaryKey": [ [ "subscriptionId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Mon Sep 12 13:48:16 PDT 2016", "DatasetId": 102, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannelBrokerSubscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "BrokerSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannelBrokerSubscriptions", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "channelSubId" ], [ "brokerSubId" ] ], "PrimaryKey": [ [ "channelSubId" ], [ "brokerSubId" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Wed Jun 20 14:48:20 PDT 2018", "DatasetId": 133, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannelChannelSubscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannelChannelSubscriptions", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "channelSubId" ] ], "PrimaryKey": [ [ "channelSubId" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Wed Jun 20 14:48:20 PDT 2018", "DatasetId": 132, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannelResults", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannelResults", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Wed Jun 20 14:48:20 PDT 2018", "DatasetId": 134, "PendingOp": 0 }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
index 225d83f..f2169ac 100644
--- a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
@@ -1 +1 @@
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel", "SubscriptionsDatasetName": "nearbyTweetChannelSubscriptions", "ResultsDatasetName": "nearbyTweetChannelResults", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannelResults" ], [ "channels", "nearbyTweetChannelSubscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with \"false\";\ninsert into channels.nearbyTweetChannelResults as a (\nwith channelExecutionTime as current_datetime() \nselect result, channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() as deliveryTime\nfrom channels.nearbyTweetChannelSubscriptions sub,\nMetadata.Broker b, \nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) returning a;" }
\ No newline at end of file
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel", "ChannelSubscriptionsDatasetName": "nearbyTweetChannelChannelSubscriptions", "ResultsDatasetName": "nearbyTweetChannelResults", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannelResults" ], [ "channels", "nearbyTweetChannelChannelSubscriptions" ], [ "channels", "nearbyTweetChannelBrokerSubscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with \"false\";\ninsert into channels.nearbyTweetChannelResults as a (\nwith channelExecutionTime as current_datetime() \nselect result, channelExecutionTime, sub.channelSubId as channelSubId,current_datetime() as deliveryTime,\n(select b.BrokerEndPoint, bs.brokerSubId from\nchannels.nearbyTweetChannelBrokerSubscriptions bs,\nMetadata.Broker b\nwhere bs.BrokerName = b.BrokerName\nand bs.DataverseName = b.DataverseName\nand bs.channelSubId = sub.channelSu
 bId\n) as brokerSubIds\nfrom channels.nearbyTweetChannelChannelSubscriptions sub,\nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \n) returning a;", "BrokerSubscriptionsDatasetName": "nearbyTweetChannelBrokerSubscriptions" }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
index 7d160fa..16063bc 100644
--- a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
@@ -1,4 +1,6 @@
-{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel1Results", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannel1Results", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Tue Sep 13 09:50:56 PDT 2016", "DatasetId": 103, "PendingOp": 0 }
-{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel1Subscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannel1Subscriptions", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "subscriptionId" ] ], "PrimaryKey": [ [ "subscriptionId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Tue Sep 13 09:50:56 PDT 2016", "DatasetId": 102, "PendingOp": 0 }
-{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel3Results", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannel3Results", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Tue Sep 13 09:50:58 PDT 2016", "DatasetId": 107, "PendingOp": 0 }
-{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel3Subscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannel3Subscriptions", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "subscriptionId" ] ], "PrimaryKey": [ [ "subscriptionId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Tue Sep 13 09:50:58 PDT 2016", "DatasetId": 106, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel1BrokerSubscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "BrokerSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannel1BrokerSubscriptions", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "channelSubId" ], [ "brokerSubId" ] ], "PrimaryKey": [ [ "channelSubId" ], [ "brokerSubId" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Wed Jun 20 14:58:14 PDT 2018", "DatasetId": 103, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel1ChannelSubscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannel1ChannelSubscriptions", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "channelSubId" ] ], "PrimaryKey": [ [ "channelSubId" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Wed Jun 20 14:58:14 PDT 2018", "DatasetId": 102, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel1Results", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannel1Results", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Wed Jun 20 14:58:14 PDT 2018", "DatasetId": 104, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel3BrokerSubscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "BrokerSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannel3BrokerSubscriptions", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "channelSubId" ], [ "brokerSubId" ] ], "PrimaryKey": [ [ "channelSubId" ], [ "brokerSubId" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Wed Jun 20 14:58:16 PDT 2018", "DatasetId": 109, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel3ChannelSubscriptions", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelSubscriptionsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannel3ChannelSubscriptions", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "channelSubId" ] ], "PrimaryKey": [ [ "channelSubId" ] ], "Autogenerated": false }, "Hints": {{  }}, "Timestamp": "Wed Jun 20 14:58:16 PDT 2018", "DatasetId": 108, "PendingOp": 0 }
+{ "DataverseName": "channels", "DatasetName": "nearbyTweetChannel3Results", "DatatypeDataverseName": "Metadata", "DatatypeName": "ChannelResultsType", "DatasetType": "INTERNAL", "GroupName": "channels.nearbyTweetChannel3Results", "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ], "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ [ "resultId" ] ], "PrimaryKey": [ [ "resultId" ] ], "Autogenerated": true }, "Hints": {{  }}, "Timestamp": "Wed Jun 20 14:58:16 PDT 2018", "DatasetId": 110, "PendingOp": 0 }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
index 8d8899d..364cfab 100644
--- a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
@@ -1,2 +1,2 @@
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", "SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", "ResultsDatasetName": "nearbyTweetChannel1Results", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannel1Results" ], [ "channels", "nearbyTweetChannel1Subscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with \"false\";\ninsert into channels.nearbyTweetChannel1Results as a (\nwith channelExecutionTime as current_datetime() \nselect result, channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() as deliveryTime\nfrom channels.nearbyTweetChannel1Subscriptions sub,\nMetadata.Broker b, \nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) returning a;" }
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", "SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", "ResultsDatasetName": "nearbyTweetChannel3Results", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannel3Results" ], [ "channels", "nearbyTweetChannel3Subscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with \"false\";\ninsert into channels.nearbyTweetChannel3Results as a (\nwith channelExecutionTime as current_datetime() \nselect result, channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() as deliveryTime\nfrom channels.nearbyTweetChannel3Subscriptions sub,\nMetadata.Broker b, \nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) returning a;" }
\ No newline at end of file
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", "ChannelSubscriptionsDatasetName": "nearbyTweetChannel1ChannelSubscriptions", "ResultsDatasetName": "nearbyTweetChannel1Results", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannel1Results" ], [ "channels", "nearbyTweetChannel1ChannelSubscriptions" ], [ "channels", "nearbyTweetChannel1BrokerSubscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with \"false\";\ninsert into channels.nearbyTweetChannel1Results as a (\nwith channelExecutionTime as current_datetime() \nselect result, channelExecutionTime, sub.channelSubId as channelSubId,current_datetime() as deliveryTime,\n(select b.BrokerEndPoint, bs.brokerSubId from\nchannels.nearbyTweetChannel1BrokerSubscriptions bs,\nMetadata.Broker b\nwhere bs.BrokerName = b.BrokerName\nand bs.DataverseName = b.DataverseName\nand bs.channelSubId = sub.c
 hannelSubId\n) as brokerSubIds\nfrom channels.nearbyTweetChannel1ChannelSubscriptions sub,\nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \n) returning a;", "BrokerSubscriptionsDatasetName": "nearbyTweetChannel1BrokerSubscriptions" }
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", "ChannelSubscriptionsDatasetName": "nearbyTweetChannel3ChannelSubscriptions", "ResultsDatasetName": "nearbyTweetChannel3Results", "Function": [ "channels", "NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ "channels", "nearbyTweetChannel3Results" ], [ "channels", "nearbyTweetChannel3ChannelSubscriptions" ], [ "channels", "nearbyTweetChannel3BrokerSubscriptions" ] ], [ [ "channels", "NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with \"false\";\ninsert into channels.nearbyTweetChannel3Results as a (\nwith channelExecutionTime as current_datetime() \nselect result, channelExecutionTime, sub.channelSubId as channelSubId,current_datetime() as deliveryTime,\n(select b.BrokerEndPoint, bs.brokerSubId from\nchannels.nearbyTweetChannel3BrokerSubscriptions bs,\nMetadata.Broker b\nwhere bs.BrokerName = b.BrokerName\nand bs.DataverseName = b.DataverseName\nand bs.channelSubId = sub.c
 hannelSubId\n) as brokerSubIds\nfrom channels.nearbyTweetChannel3ChannelSubscriptions sub,\nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \n) returning a;", "BrokerSubscriptionsDatasetName": "nearbyTweetChannel3BrokerSubscriptions" }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.1.adm b/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.1.adm
index ce8cc0f..426ca7d 100644
--- a/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.1.adm
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.1.adm
@@ -1,2 +1,2 @@
-{ "BrokerName": "brokerA", "param0": 123 }
-{ "BrokerName": "brokerB", "param0": 350 }
\ No newline at end of file
+{ "param0": 123 }
+{ "param0": 350 }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.3.adm b/asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.3.adm
new file mode 100644
index 0000000..426ca7d
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.3.adm
@@ -0,0 +1,2 @@
+{ "param0": 123 }
+{ "param0": 350 }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.4.adm b/asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.4.adm
new file mode 100644
index 0000000..2bb764e
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.4.adm
@@ -0,0 +1,4 @@
+{ "DataverseName": "channels", "BrokerName": "brokerA" }
+{ "DataverseName": "channels", "BrokerName": "brokerA" }
+{ "DataverseName": "channels", "BrokerName": "brokerA" }
+{ "DataverseName": "channels", "BrokerName": "brokerB" }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.6.adm b/asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.6.adm
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.6.adm
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/testsuite.xml b/asterix-bad/src/test/resources/runtimets/testsuite.xml
index 4640af1..7309bfb 100644
--- a/asterix-bad/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-bad/src/test/resources/runtimets/testsuite.xml
@@ -154,7 +154,7 @@
     <test-case FilePath="channel">
       <compilation-unit name="drop_subscriptions">
         <output-dir compare="Text">drop_subscriptions</output-dir>
-        <expected-error>Cannot alter dataset two.nearbyTweetChannelSubscriptions. two.nearbyTweetChannel(Channel) depends on it!</expected-error>
+        <expected-error>Cannot alter dataset two.nearbyTweetChannelChannelSubscriptions. two.nearbyTweetChannel(Channel) depends on it!</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="channel">
@@ -164,6 +164,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="channel">
+      <compilation-unit name="shared_subscriptions">
+        <output-dir compare="Text">shared_subscriptions</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="channel">
       <compilation-unit name="room_occupants">
         <output-dir compare="Text">room_occupants</output-dir>
       </compilation-unit>

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Xikui Wang (Code Review)" <do...@asterixdb.incubator.apache.org>.
Xikui Wang has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 1:

(18 comments)

added some comments.

https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java:

Line 181:         StringBuilder builder = new StringBuilder();
One side comment. I think these constructed stmts are one per query. if not, maybe you want to change it accordingly, as you've heard from them that multiple stmts in one query will soon become invalid.


Line 182:         builder.append("upsert into " + channelSubscriptionsDataset + "(\n");
quick Q. This works without use dataverse?


PS1, Line 183: s
Change this a constant and give it a better name.


PS1, Line 185: param
also, is this part of certain meta data schema?


PS1, Line 185: param
Change it to a constant variable.


PS1, Line 195: param
same


PS1, Line 229: param
same


PS1, Line 249: s
same here


https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java:

PS1, Line 184: partitionFields = new ArrayList<>();
             :         keyIndicators = new ArrayList<>();
             :         keyIndicators.add(0);
             :         keyIndicators.add(0);
             :         fieldName = new ArrayList<>();
             :         List<String> fieldName2 = new ArrayList<>();
             :         fieldName.add(BADConstants.ChannelSubscriptionId);
             :         fieldName2.add(BADConstants.BrokerSubscriptionId);
             :         partitionFields.add(fieldName);
             :         partitionFields.add(fieldName2);
Do you mind explain this a little bit? also, regular_name+number is very confusing...


PS1, Line 280: a
I'm ok with constructing query strings. One minor suggestion would be to create constant variables for variable names that are used in the query. There are several benefits: 1) easier for future maintenance; 2) If the variable means something, it can be shown in the names; 3) less likely to have conflicts when adding new variables.


https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
File asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java:

Line 186:         //Maybe we need to add a project???
?


Line 245: 
empty line


Line 343:     /*This function is used to find specific operators within the plan, either
You should be able to use the function signature to get a better template for this.


PS1, Line 352:  String param1, String param2
these two parameters seem not being used except for in recursive call... safely to remove?


Line 352:     private AbstractLogicalOperator findOp(AbstractLogicalOperator op, int searchId, String param1, String param2) {
I think you could use LogicalOperatorTag here directly


https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp
File asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp:

PS1, Line 30: param0
correlation with the createChannel param0? (probably not...)


https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp
File asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp:

PS1, Line 21: param0
same here?


https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp
File asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp:

Line 19:  // polltimeoutsecs=15
ws


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: Yes

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 3:

(2 comments)

Addressed comments

https://asterix-gerrit.ics.uci.edu/#/c/2731/3/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java:

PS3, Line 348: new StringReader(builder.toString())
> Just notice this. You should be able to pass String directly and to avoid u
Done


https://asterix-gerrit.ics.uci.edu/#/c/2731/3/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
File asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java:

PS3, Line 343:     //
> remove this
Done


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: Yes

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 2:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterixbad-gerrit/323/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/2731

to look at the new patch set (#3).

Change subject: Move channels to a result sharing framework
......................................................................

Move channels to a result sharing framework

Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
---
M asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
M asterix-bad/src/main/resources/lang-extension/lang.txt
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
M asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp
A asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.1.ddl.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.2.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.4.query.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.5.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.sqlpp
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.1.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.3.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.4.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.6.adm
M asterix-bad/src/test/resources/runtimets/testsuite.xml
39 files changed, 1,454 insertions(+), 824 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/31/2731/3
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 2:

(6 comments)

Addressed Comments

https://asterix-gerrit.ics.uci.edu/#/c/2731/2/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java:

PS2, Line 181: ng channelSubVar = "channelSub";
             :         String param = 
> Move these to the top of the class and make it public static final. Like th
Done


https://asterix-gerrit.ics.uci.edu/#/c/2731/2/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java:

PS2, Line 185: keyIndicators = new ArrayList<>();
             :         keyIndicators.add(0);
             :         keyIndicators.add(0);
> it seems the key indicator is not modified but only used as a parameter. If
Done


PS2, Line 207:  new ArrayList<>();
             :             channelSubKey.add(BADConstants.ResultId);
             :             partitionFields.add(ch
> Replaceable with singletonList?
Done


PS2, Line 278: nsertedRecordVar = "a";
             :         String channelSubscriptionRecordVar = "sub";
             :         String channelParamPrefix = channelSubscriptionRecordVar + ".param";
             :         String functionResultVar = "result";
             :         String brokerRecordVar = "b";
             :         String brokerSu
> same as the previous one. Change it to private static final String and put 
Done


https://asterix-gerrit.ics.uci.edu/#/c/2731/2/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
File asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java:

Line 343:     //Find Specific Operators within the plan
> Please restore the comments and format it like this:
Done


PS2, Line 374: tractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(), searchTag,
             :                         subscriptionsName, subscriptionType);
             :                 if (nestedOp != null) {
             :                     return nestedOp;
             :                 }
> Minor code-style suggestion. If it's a recursive function, put the recursiv
Done


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: Yes

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/2731

to look at the new patch set (#5).

Change subject: Move channels to a result sharing framework
......................................................................

Move channels to a result sharing framework

Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
---
M asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
M asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
M asterix-bad/src/main/resources/lang-extension/lang.txt
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
M asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp
A asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.1.ddl.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.2.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.4.query.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.5.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.sqlpp
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.1.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.3.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.4.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.6.adm
M asterix-bad/src/test/resources/runtimets/testsuite.xml
40 files changed, 1,471 insertions(+), 860 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/31/2731/5
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/2731

to look at the new patch set (#6).

Change subject: Move channels to a result sharing framework
......................................................................

Move channels to a result sharing framework

Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
---
M asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
M asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
M asterix-bad/src/main/resources/lang-extension/lang.txt
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
M asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp
A asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.1.ddl.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.2.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.4.query.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.5.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.sqlpp
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.1.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.3.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.4.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.6.adm
M asterix-bad/src/test/resources/runtimets/testsuite.xml
40 files changed, 1,469 insertions(+), 863 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/31/2731/6
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 6
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Xikui Wang (Code Review)" <do...@asterixdb.incubator.apache.org>.
Xikui Wang has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 2:

(6 comments)

added some more comments. Congratulations to the new dad! :)

https://asterix-gerrit.ics.uci.edu/#/c/2731/2/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java:

PS2, Line 181: ng channelSubVar = "channelSub";
             :         String param = 
Move these to the top of the class and make it public static final. Like this one: org.apache.hyracks.storage.am.common.datagen.DocumentStringFieldValueGenerator
Please do the same for the others.


https://asterix-gerrit.ics.uci.edu/#/c/2731/2/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java:

PS2, Line 185: keyIndicators = new ArrayList<>();
             :         keyIndicators.add(0);
             :         keyIndicators.add(0);
it seems the key indicator is not modified but only used as a parameter. If so, replace these with Collections.singletonList(0) and put it into the parameter lists directly.


PS2, Line 207:  new ArrayList<>();
             :             channelSubKey.add(BADConstants.ResultId);
             :             partitionFields.add(ch
Replaceable with singletonList?


PS2, Line 278: nsertedRecordVar = "a";
             :         String channelSubscriptionRecordVar = "sub";
             :         String channelParamPrefix = channelSubscriptionRecordVar + ".param";
             :         String functionResultVar = "result";
             :         String brokerRecordVar = "b";
             :         String brokerSu
same as the previous one. Change it to private static final String and put it at the top.


https://asterix-gerrit.ics.uci.edu/#/c/2731/2/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
File asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java:

Line 343:     //Find Specific Operators within the plan
Please restore the comments and format it like this:
/**
     * 
     * @param partitioningExpr
     * @param keySourceIndicators
     * @param autogenerated
     * @param filterField
     */
(You cant get this nice tempalte by type /** infront of a method and hit new line in Intellij automatically.)


PS2, Line 374: tractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(), searchTag,
             :                         subscriptionsName, subscriptionType);
             :                 if (nestedOp != null) {
             :                     return nestedOp;
             :                 }
Minor code-style suggestion. If it's a recursive function, put the recursive call branch at the top would be easier for others to understand the logic.


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: Yes

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/2731

to look at the new patch set (#2).

Change subject: Move channels to a result sharing framework
......................................................................

Move channels to a result sharing framework

Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
---
M asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
M asterix-bad/src/main/resources/lang-extension/lang.txt
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
M asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp
A asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.1.ddl.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.2.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.4.query.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.5.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.sqlpp
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.1.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.3.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.4.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.6.adm
M asterix-bad/src/test/resources/runtimets/testsuite.xml
39 files changed, 1,443 insertions(+), 821 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/31/2731/2
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/2731

to look at the new patch set (#4).

Change subject: Move channels to a result sharing framework
......................................................................

Move channels to a result sharing framework

Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
---
M asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
M asterix-bad/src/main/resources/lang-extension/lang.txt
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced-index-only.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-advanced.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-push.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
M asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp
A asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.1.ddl.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.2.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp
R asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.4.query.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.5.update.sqlpp
C asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp
M asterix-bad/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.5.query.sqlpp
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_datasets/create_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/results/channel/room_occupants/room_occupants.1.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.3.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.4.adm
A asterix-bad/src/test/resources/runtimets/results/channel/shared_subscriptions/shared_subscriptions.6.adm
M asterix-bad/src/test/resources/runtimets/testsuite.xml
39 files changed, 1,453 insertions(+), 828 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/31/2731/4
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterixbad-gerrit/322/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 6:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterixbad-gerrit/340/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 6
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 1:

(18 comments)

https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java:

Line 181:         StringBuilder builder = new StringBuilder();
> One side comment. I think these constructed stmts are one per query. if not
I think the BAD stuff is okay, I only use "SET as a second statement, which he mentioned will still work"


Line 182:         builder.append("upsert into " + channelSubscriptionsDataset + "(\n");
> quick Q. This works without use dataverse?
The names passed are absolute so no "use" is needed.


PS1, Line 183: s
> Change this a constant and give it a better name.
Done


PS1, Line 185: param
> also, is this part of certain meta data schema?
The ChannelSubscriptions dataset automatically names the fields for the param values as param0,param1,...


PS1, Line 185: param
> Change it to a constant variable.
Done


PS1, Line 195: param
> same
Done


PS1, Line 229: param
> same
Done


PS1, Line 249: s
> same here
Done


https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java:

PS1, Line 184: partitionFields = new ArrayList<>();
             :         keyIndicators = new ArrayList<>();
             :         keyIndicators.add(0);
             :         keyIndicators.add(0);
             :         fieldName = new ArrayList<>();
             :         List<String> fieldName2 = new ArrayList<>();
             :         fieldName.add(BADConstants.ChannelSubscriptionId);
             :         fieldName2.add(BADConstants.BrokerSubscriptionId);
             :         partitionFields.add(fieldName);
             :         partitionFields.add(fieldName2);
> Do you mind explain this a little bit? also, regular_name+number is very co
I'll clean up the names. Essentially the partitioning fields are each a list (for cases of nested keys). We have two fields for the primary key here, the channelSubscriptionId and the brokerSubscriptionId. Since neither is nested each list is of size one.


PS1, Line 280: a
> I'm ok with constructing query strings. One minor suggestion would be to cr
Done


https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
File asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java:

Line 186:         //Maybe we need to add a project???
> ?
Done


Line 245: 
> empty line
Done


Line 343:     /*This function is used to find specific operators within the plan, either
> You should be able to use the function signature to get a better template f
Done


PS1, Line 352:  String param1, String param2
> these two parameters seem not being used except for in recursive call... sa
This method went through several rewrites, so it was pretty messy. I cleaned it up now. Take a look at the new version and see what you think.


Line 352:     private AbstractLogicalOperator findOp(AbstractLogicalOperator op, int searchId, String param1, String param2) {
> I think you could use LogicalOperatorTag here directly
Done


https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp
File asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.4.query.sqlpp:

PS1, Line 30: param0
> correlation with the createChannel param0? (probably not...)
This would be the first parameter value from the subscribe statement. The ChannelSubscriptions dataset stores the field names as param0,param1,...


https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp
File asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.3.query.sqlpp:

PS1, Line 21: param0
> same here?
same


https://asterix-gerrit.ics.uci.edu/#/c/2731/1/asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp
File asterix-bad/src/test/resources/runtimets/queries/channel/shared_subscriptions/shared_subscriptions.6.pollquery.sqlpp:

Line 19:  // polltimeoutsecs=15
> ws
Done


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: Yes

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Xikui Wang (Code Review)" <do...@asterixdb.incubator.apache.org>.
Xikui Wang has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 3:

(2 comments)

two minors. let me know it's ready to push and i will +2. :)

https://asterix-gerrit.ics.uci.edu/#/c/2731/3/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java:

PS3, Line 348: new StringReader(builder.toString())
Just notice this. You should be able to pass String directly and to avoid using the StringReader. It's a minor issue, you can fix this in next push...


https://asterix-gerrit.ics.uci.edu/#/c/2731/3/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
File asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java:

PS3, Line 343:     //
remove this


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: Yes

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 4:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterixbad-gerrit/328/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterixbad-gerrit/321/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 5:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterixbad-gerrit/339/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb-bad[master]: Move channels to a result sharing framework

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Move channels to a result sharing framework
......................................................................


Patch Set 3:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterixbad-gerrit/327/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2731
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ifbcdf264bcd21caa0d28a9ac392b36577ca60dad
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: No