You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2018/06/25 19:38:35 UTC

[4/7] asterixdb-bad git commit: Runs but produces duplicate results (one per broker subscription)

Runs but produces duplicate results (one per broker subscription)


Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/e7a0717f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/e7a0717f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/e7a0717f

Branch: refs/heads/resultsNoReplicate
Commit: e7a0717f91ca33e707b4be96599105ad5e31bc42
Parents: 30f66b8
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Thu Jun 14 15:30:19 2018 -0700
Committer: Steven Glenn Jacobs <sj...@ucr.edu>
Committed: Thu Jun 14 15:30:19 2018 -0700

----------------------------------------------------------------------
 .../org/apache/asterix/bad/BADConstants.java    |  7 +-
 .../lang/statement/ChannelDropStatement.java    |  6 +-
 .../statement/ChannelSubscribeStatement.java    |  7 +-
 .../statement/ChannelUnsubscribeStatement.java  |  4 +-
 .../lang/statement/CreateChannelStatement.java  | 74 ++++++++++++++------
 .../bad/metadata/BADMetadataExtension.java      | 10 ++-
 .../bad/metadata/BADMetadataRecordTypes.java    | 31 +++++---
 .../apache/asterix/bad/metadata/Channel.java    | 19 +++--
 .../bad/metadata/ChannelTupleTranslator.java    | 24 +++++--
 .../InsertBrokerNotifierForChannelRule.java     | 72 +++++++++++--------
 10 files changed, 173 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index 0467f6e..db4fc2d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -19,7 +19,8 @@
 package org.apache.asterix.bad;
 
 public interface BADConstants {
-    String SubscriptionId = "subscriptionId";
+    String ChannelSubscriptionId = "channelSubId";
+    String BrokerSubscriptionId = "brokerSubId";
     String BrokerName = "BrokerName";
     String ChannelName = "ChannelName";
     String ProcedureName = "ProcedureName";
@@ -29,9 +30,11 @@ public interface BADConstants {
     String ResultId = "resultId";
     String ChannelExecutionTime = "channelExecutionTime";
     String ChannelSubscriptionsType = "ChannelSubscriptionsType";
+    String BrokerSubscriptionsType = "BrokerSubscriptionsType";
     String ChannelResultsType = "ChannelResultsType";
     String ResultsDatasetName = "ResultsDatasetName";
-    String SubscriptionsDatasetName = "SubscriptionsDatasetName";
+    String ChannelSubscriptionsDatasetName = "ChannelSubscriptionsDatasetName";
+    String BrokerSubscriptionsDatasetName = "BrokerSubscriptionsDatasetName";
     String CHANNEL_EXTENSION_NAME = "Channel";
     String PROCEDURE_KEYWORD = "Procedure";
     String BROKER_KEYWORD = "Broker";

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 8403fcc..5d9d971 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -142,7 +142,11 @@ public class ChannelDropStatement extends ExtensionStatement {
             ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
             tempMdProvider.getLocks().reset();
             dropStmt = new DropDatasetStatement(new Identifier(dataverse),
-                    new Identifier(channel.getSubscriptionsDataset()), true);
+                    new Identifier(channel.getChannelSubscriptionsDataset()), true);
+            ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
+            tempMdProvider.getLocks().reset();
+            dropStmt = new DropDatasetStatement(new Identifier(dataverse),
+                    new Identifier(channel.getBrokerSubscriptionsDataset()), true);
             ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index ca1241c..cb2498c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -138,7 +138,7 @@ public class ChannelSubscribeStatement extends ExtensionStatement {
                 throw new AsterixException("There is no broker with this name " + brokerName + ".");
             }
 
-            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+            String subscriptionsDatasetName = channel.getChannelSubscriptionsDataset();
 
             if (argList.size() != channel.getFunction().getArity()) {
                 throw new AsterixException("Channel expected " + channel.getFunction().getArity()
@@ -157,7 +157,7 @@ public class ChannelSubscribeStatement extends ExtensionStatement {
             fb.add(new FieldBinding(leftExpr, rightExpr));
 
             if (subscriptionId != null) {
-                leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
+                leftExpr = new LiteralExpr(new StringLiteral(BADConstants.ChannelSubscriptionId));
 
                 List<Expression> UUIDList = new ArrayList<>();
                 UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
@@ -190,7 +190,8 @@ public class ChannelSubscribeStatement extends ExtensionStatement {
                 VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
                 VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0));
                 useResultVar.setIsNewVar(false);
-                FieldAccessor accessor = new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId));
+                FieldAccessor accessor =
+                        new FieldAccessor(useResultVar, new Identifier(BADConstants.ChannelSubscriptionId));
 
                 metadataProvider.setResultSetId(new ResultSetId(resultSetId));
                 boolean resultsAsync =

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 1b18f83..03a3f1d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -113,11 +113,11 @@ public class ChannelUnsubscribeStatement extends ExtensionStatement {
                 throw new AsterixException("There is no channel with this name " + channelName + ".");
             }
 
-            String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+            String subscriptionsDatasetName = channel.getChannelSubscriptionsDataset();
 
             //Need a condition to say subscription-id = sid
             OperatorExpr condition = new OperatorExpr();
-            FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.SubscriptionId));
+            FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.ChannelSubscriptionId));
             condition.addOperand(fa);
             condition.setCurrentop(true);
             condition.addOperator("=");

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 03a48dc..5e4222e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -85,7 +85,8 @@ public class CreateChannelStatement extends ExtensionStatement {
     private Identifier dataverseName;
     private String duration;
     private String body;
-    private String subscriptionsTableName;
+    private String channelSubscriptionsTableName;
+    private String brokerSubscriptionsTableName;
     private String resultsTableName;
     private String dataverse;
     private final boolean push;
@@ -113,7 +114,7 @@ public class CreateChannelStatement extends ExtensionStatement {
     }
 
     public String getSubscriptionsName() {
-        return subscriptionsTableName;
+        return channelSubscriptionsTableName;
     }
 
     public String getDuration() {
@@ -159,29 +160,53 @@ public class CreateChannelStatement extends ExtensionStatement {
     private void createDatasets(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws AsterixException, Exception {
 
+        //Create channel subscriptions dataset
         Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
         Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
         //Setup the subscriptions dataset
         List<List<String>> partitionFields = new ArrayList<>();
         List<Integer> keyIndicators = new ArrayList<>();
         keyIndicators.add(0);
-        List<String> fieldNames = new ArrayList<>();
-        fieldNames.add(BADConstants.SubscriptionId);
-        partitionFields.add(fieldNames);
+        List<String> fieldName = new ArrayList<>();
+        fieldName.add(BADConstants.ChannelSubscriptionId);
+        partitionFields.add(fieldName);
         IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, false, null);
-        DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName),
-                new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
-                new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
-
-        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
+        DatasetDecl createChannelSubscriptionsDataset =
+                new DatasetDecl(dataverseName, new Identifier(channelSubscriptionsTableName),
+                        new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
+                        new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
+
+        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
+                createChannelSubscriptionsDataset, hcc, null);
+
+        //Create broker subscriptions dataset
+        Identifier brokerSubscriptionsTypeName = new Identifier(BADConstants.BrokerSubscriptionsType);
+        partitionFields = new ArrayList<>();
+        keyIndicators = new ArrayList<>();
+        keyIndicators.add(0);
+        keyIndicators.add(0);
+        fieldName = new ArrayList<>();
+        List<String> fieldName2 = new ArrayList<>();
+        fieldName.add(BADConstants.ChannelSubscriptionId);
+        fieldName2.add(BADConstants.BrokerSubscriptionId);
+        partitionFields.add(fieldName);
+        partitionFields.add(fieldName2);
+        idd = new InternalDetailsDecl(partitionFields, keyIndicators, false, null);
+        DatasetDecl createBrokerSubscriptionsDataset =
+                new DatasetDecl(dataverseName, new Identifier(brokerSubscriptionsTableName),
+                        new Identifier(BADConstants.BAD_DATAVERSE_NAME), brokerSubscriptionsTypeName, null, null, null,
+                        new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
+        metadataProvider.getLocks().reset();
+        ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
+                createBrokerSubscriptionsDataset,
                 hcc, null);
 
         if (!push) {
             //Setup the results dataset
             partitionFields = new ArrayList<>();
-            fieldNames = new ArrayList<>();
-            fieldNames.add(BADConstants.ResultId);
-            partitionFields.add(fieldNames);
+            fieldName = new ArrayList<>();
+            fieldName.add(BADConstants.ResultId);
+            partitionFields.add(fieldName);
             idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
             DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
                     new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, new HashMap<>(),
@@ -225,9 +250,10 @@ public class CreateChannelStatement extends ExtensionStatement {
         builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
         builder.append("select result, ");
         builder.append(BADConstants.ChannelExecutionTime + ", ");
-        builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
+        builder.append("sub." + BADConstants.ChannelSubscriptionId + " as " + BADConstants.ChannelSubscriptionId + ",");
         builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
-        builder.append("from " + dataverse + "." + subscriptionsTableName + " sub,\n");
+        builder.append("from " + dataverse + "." + brokerSubscriptionsTableName + " bs,\n");
+        builder.append(dataverse + "." + channelSubscriptionsTableName + " sub,\n");
         builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
         builder.append(function.getNamespace() + "." + function.getName() + "(");
         int i = 0;
@@ -235,8 +261,10 @@ public class CreateChannelStatement extends ExtensionStatement {
             builder.append("sub.param" + i + ",");
         }
         builder.append("sub.param" + i + ") result \n");
-        builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n");
-        builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n");
+        builder.append("where b." + BADConstants.BrokerName + " = bs." + BADConstants.BrokerName + "\n");
+        builder.append("and b." + BADConstants.DataverseName + " = bs." + BADConstants.DataverseName + "\n");
+        builder.append(
+                "and bs." + BADConstants.ChannelSubscriptionId + " = sub." + BADConstants.ChannelSubscriptionId + "\n");
         if (!push) {
             builder.append(")");
             builder.append(" returning a");
@@ -269,7 +297,9 @@ public class CreateChannelStatement extends ExtensionStatement {
 
         dataverseName = new Identifier(((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName));
         dataverse = dataverseName.getValue();
-        subscriptionsTableName = channelName + BADConstants.subscriptionEnding;
+        channelSubscriptionsTableName =
+                channelName + BADConstants.CHANNEL_EXTENSION_NAME + BADConstants.subscriptionEnding;
+        brokerSubscriptionsTableName = channelName + BADConstants.BROKER_KEYWORD + BADConstants.subscriptionEnding;
         resultsTableName = push ? "" : channelName + BADConstants.resultsEnding;
 
         EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
@@ -297,7 +327,10 @@ public class CreateChannelStatement extends ExtensionStatement {
             initialize(mdTxnCtx);
 
             //check if names are available before creating anything
-            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsTableName) != null) {
+            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, channelSubscriptionsTableName) != null) {
+                throw new AsterixException("The channel name:" + channelName + " is not available.");
+            }
+            if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, brokerSubscriptionsTableName) != null) {
                 throw new AsterixException("The channel name:" + channelName + " is not available.");
             }
             if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsTableName) != null) {
@@ -323,7 +356,8 @@ public class CreateChannelStatement extends ExtensionStatement {
 
             BADJobService.setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory(),
                     duration);
-            channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
+            channel = new Channel(dataverse, channelName.getValue(), channelSubscriptionsTableName,
+                    brokerSubscriptionsTableName, resultsTableName, function,
                     duration, null, body);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index cd2ff86..5be2e84 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -47,8 +47,12 @@ public class BADMetadataExtension implements IMetadataExtension {
     public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
             NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
 
-    public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+    public static final Datatype BAD_CHANNEL_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
             BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
+
+    public static final Datatype BAD_BROKER_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+            BADConstants.BrokerSubscriptionsType, BADMetadataRecordTypes.brokerSubscriptionsType, false);
+
     public static final Datatype BAD_RESULT_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
             BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
 
@@ -76,7 +80,6 @@ public class BADMetadataExtension implements IMetadataExtension {
         return new MetadataTupleTranslatorProvider();
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
     public List<ExtensionMetadataDataset> getExtensionIndexes() {
         try {
@@ -107,7 +110,8 @@ public class BADMetadataExtension implements IMetadataExtension {
                 // MetadataManager.INSTANCE.addDataverse(mdTxnCtx, BAD_DATAVERSE);
                 // insert default data type
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_RESULT_DATATYPE);
-                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_SUBSCRIPTION_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_SUBSCRIPTION_DATATYPE);
+                MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_SUBSCRIPTION_DATATYPE);
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_DATATYPE);
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_DATATYPE);
                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_PROCEDURE_DATATYPE);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index fafa23f..f6e2e1f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -27,16 +27,24 @@ import org.apache.asterix.om.types.IAType;
 
 public class BADMetadataRecordTypes {
 
-    // -------------------------------------- Subscriptions --------------------------------------//
-    private static final String[] subTypeFieldNames =
-            { BADConstants.SubscriptionId };
-    private static final IAType[] subTypeFieldTypes = { BuiltinType.AUUID };
+    // -------------------------------------- Channel Subscriptions --------------------------------------//
+    private static final String[] channelSubTypeFieldNames = { BADConstants.ChannelSubscriptionId };
+    private static final IAType[] channelSubTypeFieldTypes = { BuiltinType.AUUID };
     public static final ARecordType channelSubscriptionsType =
-            new ARecordType(BADConstants.ChannelSubscriptionsType, subTypeFieldNames, subTypeFieldTypes, true);
+            new ARecordType(BADConstants.ChannelSubscriptionsType, channelSubTypeFieldNames, channelSubTypeFieldTypes,
+                    true);
+
+    // -------------------------------------- Broker Subscriptions --------------------------------------//
+    private static final String[] brokerSubTypeFieldNames = { BADConstants.ChannelSubscriptionId,
+            BADConstants.BrokerSubscriptionId, BADConstants.DataverseName, BADConstants.BrokerName };
+    private static final IAType[] brokerSubTypeFieldTypes =
+            { BuiltinType.AUUID, BuiltinType.AUUID, BuiltinType.ASTRING, BuiltinType.ASTRING };
+    public static final ARecordType brokerSubscriptionsType = new ARecordType(BADConstants.BrokerSubscriptionsType,
+            brokerSubTypeFieldNames, brokerSubTypeFieldTypes, true);
 
     // ---------------------------------------- Results --------------------------------------------//
     private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime,
-            BADConstants.SubscriptionId, BADConstants.DeliveryTime };
+            BADConstants.ChannelSubscriptionId, BADConstants.DeliveryTime };
     private static final IAType[] resultTypeFieldTypes =
             { BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID, BuiltinType.ADATETIME };
     public static final ARecordType channelResultsType =
@@ -45,25 +53,28 @@ public class BADMetadataRecordTypes {
     //------------------------------------------ Channel ----------------------------------------//     
     public static final int CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
     public static final int CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX = 1;
-    public static final int CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
+    public static final int CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
     public static final int CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX = 3;
     public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
     public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
     public static final int CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX = 6;
     public static final int CHANNEL_ARECORD_BODY_FIELD_INDEX = 7;
+    public static final int CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX = 8;
     public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
             // RecordTypeName
             BADConstants.RECORD_TYPENAME_CHANNEL,
             // FieldNames
-            new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
+            new String[] { BADConstants.DataverseName, BADConstants.ChannelName,
+                    BADConstants.ChannelSubscriptionsDatasetName,
                     BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration,
-                    BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY },
+                    BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY,
+                    BADConstants.BrokerSubscriptionsDatasetName },
             // FieldTypes
             new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
                     new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING,
                     new AOrderedListType(new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null),
                             null),
-                    BuiltinType.ASTRING },
+                    BuiltinType.ASTRING, BuiltinType.ASTRING },
             //IsOpen?
             true);
     //------------------------------------------ Broker ----------------------------------------//

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
index ed9346c..61df9af 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -34,7 +34,8 @@ public class Channel implements IExtensionMetadataEntity {
 
     /** A unique identifier for the channel */
     protected final EntityId channelId;
-    private final String subscriptionsDatasetName;
+    private final String channelSubscriptionsDatasetName;
+    private final String brokerSubscriptionsDatasetName;
     private final String resultsDatasetName;
     private final String duration;
     private final String channelBody;
@@ -49,13 +50,15 @@ public class Channel implements IExtensionMetadataEntity {
     */
     private final List<List<List<String>>> dependencies;
 
-    public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
+    public Channel(String dataverseName, String channelName, String channelSubscriptionsDatasetName,
+            String brokerSubscriptionsDatasetName, String resultsDataset,
             FunctionSignature function, String duration, List<List<List<String>>> dependencies, String channelBody) {
         this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
         this.function = function;
         this.duration = duration;
         this.resultsDatasetName = resultsDataset;
-        this.subscriptionsDatasetName = subscriptionsDataset;
+        this.channelSubscriptionsDatasetName = channelSubscriptionsDatasetName;
+        this.brokerSubscriptionsDatasetName = brokerSubscriptionsDatasetName;
         this.channelBody = channelBody;
         if (this.function.getNamespace() == null) {
             this.function.setNamespace(dataverseName);
@@ -67,7 +70,7 @@ public class Channel implements IExtensionMetadataEntity {
             this.dependencies.add(new ArrayList<>());
             this.dependencies.add(new ArrayList<>());
             List<String> resultsList = Arrays.asList(dataverseName, resultsDatasetName);
-            List<String> subscriptionList = Arrays.asList(dataverseName, subscriptionsDatasetName);
+            List<String> subscriptionList = Arrays.asList(dataverseName, channelSubscriptionsDatasetName);
             this.dependencies.get(0).add(resultsList);
             this.dependencies.get(0).add(subscriptionList);
             this.dependencies.get(1).add(functionAsPath);
@@ -84,8 +87,12 @@ public class Channel implements IExtensionMetadataEntity {
         return dependencies;
     }
 
-    public String getSubscriptionsDataset() {
-        return subscriptionsDatasetName;
+    public String getChannelSubscriptionsDataset() {
+        return channelSubscriptionsDatasetName;
+    }
+
+    public String getBrokerSubscriptionsDataset() {
+        return brokerSubscriptionsDatasetName;
     }
 
     public String getResultsDatasetName() {

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index 175280e..6cf7f50 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -82,8 +82,9 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
         String channelName =
                 ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX))
                         .getStringValue();
-        String subscriptionsName = ((AString) channelRecord
-                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX)).getStringValue();
+        String channelSubscriptionsName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX))
+                        .getStringValue();
         String resultsName =
                 ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX))
                         .getStringValue();
@@ -127,10 +128,15 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
                 ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX))
                         .getStringValue();
 
+        String brokerSubscriptionsName = ((AString) channelRecord
+                .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX))
+                        .getStringValue();
+
         FunctionSignature signature = new FunctionSignature(functionSignature.get(0), functionSignature.get(1),
                 Integer.parseInt(functionSignature.get(2)));
 
-        channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration,
+        channel = new Channel(dataverseName, channelName, channelSubscriptionsName, brokerSubscriptionsName,
+                resultsName, signature, duration,
                 dependencies, channelBody);
         return channel;
     }
@@ -164,9 +170,10 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
 
         // write field 2
         fieldValue.reset();
-        aString.setValue(channel.getSubscriptionsDataset());
+        aString.setValue(channel.getChannelSubscriptionsDataset());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX, fieldValue);
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX,
+                fieldValue);
 
         // write field 3
         fieldValue.reset();
@@ -227,6 +234,13 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX, fieldValue);
 
+        // write field 8
+        fieldValue.reset();
+        aString.setValue(channel.getBrokerSubscriptionsDataset());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX,
+                fieldValue);
+
         // write record
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
 

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 9ead7f0..82ca1f1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -76,25 +76,19 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
         if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
             return false;
         }
-        boolean push = false;
 
-        AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
-            if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
-                return false;
-            }
+        boolean push = false;
+        AbstractLogicalOperator op = findOp(op1, 4, "");
+        if (op == null) {
             push = true;
         }
+
         DataSourceScanOperator subscriptionsScan;
         String channelDataverse;
         String channelName;
 
         if (!push) {
-            DelegateOperator eOp = (DelegateOperator) op;
-            if (!(eOp.getDelegate() instanceof CommitOperator)) {
-                return false;
-            }
-            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
             if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
                 return false;
             }
@@ -113,15 +107,19 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
             //Now we know that we are inserting into results
 
             channelName = datasetName.substring(0, datasetName.length() - 7);
-            String subscriptionsName = channelName + "Subscriptions";
-            subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
+            String subscriptionsName = channelName + "BrokerSubscriptions";
+            subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, subscriptionsName);
             if (subscriptionsScan == null) {
                 return false;
             }
 
         } else {
+            op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+            if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
+                return false;
+            }
             //if push, get the channel name here instead
-            subscriptionsScan = (DataSourceScanOperator) findOp(op, "");
+            subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, "");
             if (subscriptionsScan == null) {
                 return false;
             }
@@ -133,13 +131,13 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
 
         //Now we need to get the broker EndPoint
         LogicalVariable brokerEndpointVar = context.newVar();
-        AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
+        AbstractLogicalOperator opAboveBrokersScan = findOp(op, 1, "");
         if (opAboveBrokersScan == null) {
             return false;
         }
 
         //get subscriptionIdVar
-        LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
+        LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(1);
 
         //The channelExecutionTime is created just before the scan
         ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
@@ -162,7 +160,7 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
         context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
         context.computeAndSetTypeEnvironmentForOperator(op);
 
-        ProjectOperator badProject = (ProjectOperator) findOp(op1, "project");
+        ProjectOperator badProject = (ProjectOperator) findOp(op1, 2, "");
         badProject.getVariables().add(subscriptionIdVar);
         badProject.getVariables().add(brokerEndpointVar);
         badProject.getVariables().add(channelExecutionVar);
@@ -310,49 +308,65 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
     }
 
     /*This function is used to find specific operators within the plan, either
-     * A. The brokers dataset scan
-     * B. The subscriptions scan
-     * C. The highest project of the plan
+     * 1. The brokers dataset scan
+     * 2. The highest project of the plan
+     * 3. The subscriptions scan
+     * 4. Commit operator
+     *
+     * param is the name of the channel when searching for a subscriptions scan
      */
-    private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
+    private AbstractLogicalOperator findOp(AbstractLogicalOperator op, int searchId, String param) {
         if (!op.hasInputs()) {
             return null;
         }
         for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
-            if (lookingForString.equals("brokers")) {
+            if (searchId == 1) {
                 if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
                     return op;
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            lookingForString);
+                            searchId, param);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
                 }
 
-            } else if (lookingForString.equals("project")) {
+            } else if (searchId == 2) {
                 if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
                     return (AbstractLogicalOperator) subOp.getValue();
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            lookingForString);
+                            searchId, param);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
                 }
             }
 
-            else {
-                if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
+            else if (searchId == 3) {
+                if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), param)) {
                     return (AbstractLogicalOperator) subOp.getValue();
                 } else {
                     AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
-                            lookingForString);
+                            searchId, param);
                     if (nestedOp != null) {
                         return nestedOp;
                     }
                 }
 
+            } else if (searchId == 4) {
+                if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+                    DelegateOperator dOp = (DelegateOperator) subOp.getValue();
+                    if (dOp.getDelegate() instanceof CommitOperator) {
+                        return (AbstractLogicalOperator) subOp.getValue();
+                    }
+                } else {
+                    AbstractLogicalOperator nestedOp =
+                            findOp((AbstractLogicalOperator) subOp.getValue(), searchId, param);
+                    if (nestedOp != null) {
+                        return nestedOp;
+                    }
+                }
             }
         }
         return null;
@@ -376,7 +390,7 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
             if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
                 DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
                 if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
-                        && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
+                        && dds.getDataset().getItemTypeName().equals(BADConstants.BrokerSubscriptionsType)) {
                     if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
                         return true;
                     }