You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2018/06/25 19:38:35 UTC
[4/7] asterixdb-bad git commit: Runs but produces duplicate results
(one per broker subscription)
Runs but produces duplicate results (one per broker subscription)
Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/e7a0717f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/e7a0717f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/e7a0717f
Branch: refs/heads/resultsNoReplicate
Commit: e7a0717f91ca33e707b4be96599105ad5e31bc42
Parents: 30f66b8
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Thu Jun 14 15:30:19 2018 -0700
Committer: Steven Glenn Jacobs <sj...@ucr.edu>
Committed: Thu Jun 14 15:30:19 2018 -0700
----------------------------------------------------------------------
.../org/apache/asterix/bad/BADConstants.java | 7 +-
.../lang/statement/ChannelDropStatement.java | 6 +-
.../statement/ChannelSubscribeStatement.java | 7 +-
.../statement/ChannelUnsubscribeStatement.java | 4 +-
.../lang/statement/CreateChannelStatement.java | 74 ++++++++++++++------
.../bad/metadata/BADMetadataExtension.java | 10 ++-
.../bad/metadata/BADMetadataRecordTypes.java | 31 +++++---
.../apache/asterix/bad/metadata/Channel.java | 19 +++--
.../bad/metadata/ChannelTupleTranslator.java | 24 +++++--
.../InsertBrokerNotifierForChannelRule.java | 72 +++++++++++--------
10 files changed, 173 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index 0467f6e..db4fc2d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -19,7 +19,8 @@
package org.apache.asterix.bad;
public interface BADConstants {
- String SubscriptionId = "subscriptionId";
+ String ChannelSubscriptionId = "channelSubId";
+ String BrokerSubscriptionId = "brokerSubId";
String BrokerName = "BrokerName";
String ChannelName = "ChannelName";
String ProcedureName = "ProcedureName";
@@ -29,9 +30,11 @@ public interface BADConstants {
String ResultId = "resultId";
String ChannelExecutionTime = "channelExecutionTime";
String ChannelSubscriptionsType = "ChannelSubscriptionsType";
+ String BrokerSubscriptionsType = "BrokerSubscriptionsType";
String ChannelResultsType = "ChannelResultsType";
String ResultsDatasetName = "ResultsDatasetName";
- String SubscriptionsDatasetName = "SubscriptionsDatasetName";
+ String ChannelSubscriptionsDatasetName = "ChannelSubscriptionsDatasetName";
+ String BrokerSubscriptionsDatasetName = "BrokerSubscriptionsDatasetName";
String CHANNEL_EXTENSION_NAME = "Channel";
String PROCEDURE_KEYWORD = "Procedure";
String BROKER_KEYWORD = "Broker";
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 8403fcc..5d9d971 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -142,7 +142,11 @@ public class ChannelDropStatement extends ExtensionStatement {
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
tempMdProvider.getLocks().reset();
dropStmt = new DropDatasetStatement(new Identifier(dataverse),
- new Identifier(channel.getSubscriptionsDataset()), true);
+ new Identifier(channel.getChannelSubscriptionsDataset()), true);
+ ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
+ tempMdProvider.getLocks().reset();
+ dropStmt = new DropDatasetStatement(new Identifier(dataverse),
+ new Identifier(channel.getBrokerSubscriptionsDataset()), true);
((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index ca1241c..cb2498c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -138,7 +138,7 @@ public class ChannelSubscribeStatement extends ExtensionStatement {
throw new AsterixException("There is no broker with this name " + brokerName + ".");
}
- String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+ String subscriptionsDatasetName = channel.getChannelSubscriptionsDataset();
if (argList.size() != channel.getFunction().getArity()) {
throw new AsterixException("Channel expected " + channel.getFunction().getArity()
@@ -157,7 +157,7 @@ public class ChannelSubscribeStatement extends ExtensionStatement {
fb.add(new FieldBinding(leftExpr, rightExpr));
if (subscriptionId != null) {
- leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
+ leftExpr = new LiteralExpr(new StringLiteral(BADConstants.ChannelSubscriptionId));
List<Expression> UUIDList = new ArrayList<>();
UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
@@ -190,7 +190,8 @@ public class ChannelSubscribeStatement extends ExtensionStatement {
VariableExpr resultVar = new VariableExpr(new VarIdentifier("$result", 0));
VariableExpr useResultVar = new VariableExpr(new VarIdentifier("$result", 0));
useResultVar.setIsNewVar(false);
- FieldAccessor accessor = new FieldAccessor(useResultVar, new Identifier(BADConstants.SubscriptionId));
+ FieldAccessor accessor =
+ new FieldAccessor(useResultVar, new Identifier(BADConstants.ChannelSubscriptionId));
metadataProvider.setResultSetId(new ResultSetId(resultSetId));
boolean resultsAsync =
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 1b18f83..03a3f1d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -113,11 +113,11 @@ public class ChannelUnsubscribeStatement extends ExtensionStatement {
throw new AsterixException("There is no channel with this name " + channelName + ".");
}
- String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+ String subscriptionsDatasetName = channel.getChannelSubscriptionsDataset();
//Need a condition to say subscription-id = sid
OperatorExpr condition = new OperatorExpr();
- FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.SubscriptionId));
+ FieldAccessor fa = new FieldAccessor(vars, new Identifier(BADConstants.ChannelSubscriptionId));
condition.addOperand(fa);
condition.setCurrentop(true);
condition.addOperator("=");
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 03a48dc..5e4222e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -85,7 +85,8 @@ public class CreateChannelStatement extends ExtensionStatement {
private Identifier dataverseName;
private String duration;
private String body;
- private String subscriptionsTableName;
+ private String channelSubscriptionsTableName;
+ private String brokerSubscriptionsTableName;
private String resultsTableName;
private String dataverse;
private final boolean push;
@@ -113,7 +114,7 @@ public class CreateChannelStatement extends ExtensionStatement {
}
public String getSubscriptionsName() {
- return subscriptionsTableName;
+ return channelSubscriptionsTableName;
}
public String getDuration() {
@@ -159,29 +160,53 @@ public class CreateChannelStatement extends ExtensionStatement {
private void createDatasets(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc) throws AsterixException, Exception {
+ //Create channel subscriptions dataset
Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
//Setup the subscriptions dataset
List<List<String>> partitionFields = new ArrayList<>();
List<Integer> keyIndicators = new ArrayList<>();
keyIndicators.add(0);
- List<String> fieldNames = new ArrayList<>();
- fieldNames.add(BADConstants.SubscriptionId);
- partitionFields.add(fieldNames);
+ List<String> fieldName = new ArrayList<>();
+ fieldName.add(BADConstants.ChannelSubscriptionId);
+ partitionFields.add(fieldName);
IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, false, null);
- DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName),
- new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
- new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
-
- ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
+ DatasetDecl createChannelSubscriptionsDataset =
+ new DatasetDecl(dataverseName, new Identifier(channelSubscriptionsTableName),
+ new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null,
+ new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
+
+ ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
+ createChannelSubscriptionsDataset, hcc, null);
+
+ //Create broker subscriptions dataset
+ Identifier brokerSubscriptionsTypeName = new Identifier(BADConstants.BrokerSubscriptionsType);
+ partitionFields = new ArrayList<>();
+ keyIndicators = new ArrayList<>();
+ keyIndicators.add(0);
+ keyIndicators.add(0);
+ fieldName = new ArrayList<>();
+ List<String> fieldName2 = new ArrayList<>();
+ fieldName.add(BADConstants.ChannelSubscriptionId);
+ fieldName2.add(BADConstants.BrokerSubscriptionId);
+ partitionFields.add(fieldName);
+ partitionFields.add(fieldName2);
+ idd = new InternalDetailsDecl(partitionFields, keyIndicators, false, null);
+ DatasetDecl createBrokerSubscriptionsDataset =
+ new DatasetDecl(dataverseName, new Identifier(brokerSubscriptionsTableName),
+ new Identifier(BADConstants.BAD_DATAVERSE_NAME), brokerSubscriptionsTypeName, null, null, null,
+ new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true);
+ metadataProvider.getLocks().reset();
+ ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider,
+ createBrokerSubscriptionsDataset,
hcc, null);
if (!push) {
//Setup the results dataset
partitionFields = new ArrayList<>();
- fieldNames = new ArrayList<>();
- fieldNames.add(BADConstants.ResultId);
- partitionFields.add(fieldNames);
+ fieldName = new ArrayList<>();
+ fieldName.add(BADConstants.ResultId);
+ partitionFields.add(fieldName);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null);
DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName),
new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, new HashMap<>(),
@@ -225,9 +250,10 @@ public class CreateChannelStatement extends ExtensionStatement {
builder.append("with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
builder.append("select result, ");
builder.append(BADConstants.ChannelExecutionTime + ", ");
- builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
+ builder.append("sub." + BADConstants.ChannelSubscriptionId + " as " + BADConstants.ChannelSubscriptionId + ",");
builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
- builder.append("from " + dataverse + "." + subscriptionsTableName + " sub,\n");
+ builder.append("from " + dataverse + "." + brokerSubscriptionsTableName + " bs,\n");
+ builder.append(dataverse + "." + channelSubscriptionsTableName + " sub,\n");
builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
builder.append(function.getNamespace() + "." + function.getName() + "(");
int i = 0;
@@ -235,8 +261,10 @@ public class CreateChannelStatement extends ExtensionStatement {
builder.append("sub.param" + i + ",");
}
builder.append("sub.param" + i + ") result \n");
- builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n");
- builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n");
+ builder.append("where b." + BADConstants.BrokerName + " = bs." + BADConstants.BrokerName + "\n");
+ builder.append("and b." + BADConstants.DataverseName + " = bs." + BADConstants.DataverseName + "\n");
+ builder.append(
+ "and bs." + BADConstants.ChannelSubscriptionId + " = sub." + BADConstants.ChannelSubscriptionId + "\n");
if (!push) {
builder.append(")");
builder.append(" returning a");
@@ -269,7 +297,9 @@ public class CreateChannelStatement extends ExtensionStatement {
dataverseName = new Identifier(((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName));
dataverse = dataverseName.getValue();
- subscriptionsTableName = channelName + BADConstants.subscriptionEnding;
+ channelSubscriptionsTableName =
+ channelName + BADConstants.CHANNEL_EXTENSION_NAME + BADConstants.subscriptionEnding;
+ brokerSubscriptionsTableName = channelName + BADConstants.BROKER_KEYWORD + BADConstants.subscriptionEnding;
resultsTableName = push ? "" : channelName + BADConstants.resultsEnding;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
@@ -297,7 +327,10 @@ public class CreateChannelStatement extends ExtensionStatement {
initialize(mdTxnCtx);
//check if names are available before creating anything
- if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsTableName) != null) {
+ if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, channelSubscriptionsTableName) != null) {
+ throw new AsterixException("The channel name:" + channelName + " is not available.");
+ }
+ if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, brokerSubscriptionsTableName) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsTableName) != null) {
@@ -323,7 +356,8 @@ public class CreateChannelStatement extends ExtensionStatement {
BADJobService.setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory(),
duration);
- channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
+ channel = new Channel(dataverse, channelName.getValue(), channelSubscriptionsTableName,
+ brokerSubscriptionsTableName, resultsTableName, function,
duration, null, body);
MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index cd2ff86..5be2e84 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -47,8 +47,12 @@ public class BADMetadataExtension implements IMetadataExtension {
public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
- public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ public static final Datatype BAD_CHANNEL_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
+
+ public static final Datatype BAD_BROKER_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ BADConstants.BrokerSubscriptionsType, BADMetadataRecordTypes.brokerSubscriptionsType, false);
+
public static final Datatype BAD_RESULT_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
@@ -76,7 +80,6 @@ public class BADMetadataExtension implements IMetadataExtension {
return new MetadataTupleTranslatorProvider();
}
- @SuppressWarnings("rawtypes")
@Override
public List<ExtensionMetadataDataset> getExtensionIndexes() {
try {
@@ -107,7 +110,8 @@ public class BADMetadataExtension implements IMetadataExtension {
// MetadataManager.INSTANCE.addDataverse(mdTxnCtx, BAD_DATAVERSE);
// insert default data type
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_RESULT_DATATYPE);
- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_SUBSCRIPTION_DATATYPE);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_SUBSCRIPTION_DATATYPE);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_SUBSCRIPTION_DATATYPE);
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_BROKER_DATATYPE);
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_CHANNEL_DATATYPE);
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_PROCEDURE_DATATYPE);
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index fafa23f..f6e2e1f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -27,16 +27,24 @@ import org.apache.asterix.om.types.IAType;
public class BADMetadataRecordTypes {
- // -------------------------------------- Subscriptions --------------------------------------//
- private static final String[] subTypeFieldNames =
- { BADConstants.SubscriptionId };
- private static final IAType[] subTypeFieldTypes = { BuiltinType.AUUID };
+ // -------------------------------------- Channel Subscriptions --------------------------------------//
+ private static final String[] channelSubTypeFieldNames = { BADConstants.ChannelSubscriptionId };
+ private static final IAType[] channelSubTypeFieldTypes = { BuiltinType.AUUID };
public static final ARecordType channelSubscriptionsType =
- new ARecordType(BADConstants.ChannelSubscriptionsType, subTypeFieldNames, subTypeFieldTypes, true);
+ new ARecordType(BADConstants.ChannelSubscriptionsType, channelSubTypeFieldNames, channelSubTypeFieldTypes,
+ true);
+
+ // -------------------------------------- Broker Subscriptions --------------------------------------//
+ private static final String[] brokerSubTypeFieldNames = { BADConstants.ChannelSubscriptionId,
+ BADConstants.BrokerSubscriptionId, BADConstants.DataverseName, BADConstants.BrokerName };
+ private static final IAType[] brokerSubTypeFieldTypes =
+ { BuiltinType.AUUID, BuiltinType.AUUID, BuiltinType.ASTRING, BuiltinType.ASTRING };
+ public static final ARecordType brokerSubscriptionsType = new ARecordType(BADConstants.BrokerSubscriptionsType,
+ brokerSubTypeFieldNames, brokerSubTypeFieldTypes, true);
// ---------------------------------------- Results --------------------------------------------//
private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime,
- BADConstants.SubscriptionId, BADConstants.DeliveryTime };
+ BADConstants.ChannelSubscriptionId, BADConstants.DeliveryTime };
private static final IAType[] resultTypeFieldTypes =
{ BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID, BuiltinType.ADATETIME };
public static final ARecordType channelResultsType =
@@ -45,25 +53,28 @@ public class BADMetadataRecordTypes {
//------------------------------------------ Channel ----------------------------------------//
public static final int CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
public static final int CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX = 1;
- public static final int CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
+ public static final int CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
public static final int CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX = 3;
public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
public static final int CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX = 6;
public static final int CHANNEL_ARECORD_BODY_FIELD_INDEX = 7;
+ public static final int CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX = 8;
public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
// RecordTypeName
BADConstants.RECORD_TYPENAME_CHANNEL,
// FieldNames
- new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
+ new String[] { BADConstants.DataverseName, BADConstants.ChannelName,
+ BADConstants.ChannelSubscriptionsDatasetName,
BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration,
- BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY },
+ BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY,
+ BADConstants.BrokerSubscriptionsDatasetName },
// FieldTypes
new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING,
new AOrderedListType(new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null),
null),
- BuiltinType.ASTRING },
+ BuiltinType.ASTRING, BuiltinType.ASTRING },
//IsOpen?
true);
//------------------------------------------ Broker ----------------------------------------//
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
index ed9346c..61df9af 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -34,7 +34,8 @@ public class Channel implements IExtensionMetadataEntity {
/** A unique identifier for the channel */
protected final EntityId channelId;
- private final String subscriptionsDatasetName;
+ private final String channelSubscriptionsDatasetName;
+ private final String brokerSubscriptionsDatasetName;
private final String resultsDatasetName;
private final String duration;
private final String channelBody;
@@ -49,13 +50,15 @@ public class Channel implements IExtensionMetadataEntity {
*/
private final List<List<List<String>>> dependencies;
- public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
+ public Channel(String dataverseName, String channelName, String channelSubscriptionsDatasetName,
+ String brokerSubscriptionsDatasetName, String resultsDataset,
FunctionSignature function, String duration, List<List<List<String>>> dependencies, String channelBody) {
this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
this.function = function;
this.duration = duration;
this.resultsDatasetName = resultsDataset;
- this.subscriptionsDatasetName = subscriptionsDataset;
+ this.channelSubscriptionsDatasetName = channelSubscriptionsDatasetName;
+ this.brokerSubscriptionsDatasetName = brokerSubscriptionsDatasetName;
this.channelBody = channelBody;
if (this.function.getNamespace() == null) {
this.function.setNamespace(dataverseName);
@@ -67,7 +70,7 @@ public class Channel implements IExtensionMetadataEntity {
this.dependencies.add(new ArrayList<>());
this.dependencies.add(new ArrayList<>());
List<String> resultsList = Arrays.asList(dataverseName, resultsDatasetName);
- List<String> subscriptionList = Arrays.asList(dataverseName, subscriptionsDatasetName);
+ List<String> subscriptionList = Arrays.asList(dataverseName, channelSubscriptionsDatasetName);
this.dependencies.get(0).add(resultsList);
this.dependencies.get(0).add(subscriptionList);
this.dependencies.get(1).add(functionAsPath);
@@ -84,8 +87,12 @@ public class Channel implements IExtensionMetadataEntity {
return dependencies;
}
- public String getSubscriptionsDataset() {
- return subscriptionsDatasetName;
+ public String getChannelSubscriptionsDataset() {
+ return channelSubscriptionsDatasetName;
+ }
+
+ public String getBrokerSubscriptionsDataset() {
+ return brokerSubscriptionsDatasetName;
}
public String getResultsDatasetName() {
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index 175280e..6cf7f50 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -82,8 +82,9 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
String channelName =
((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX))
.getStringValue();
- String subscriptionsName = ((AString) channelRecord
- .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX)).getStringValue();
+ String channelSubscriptionsName = ((AString) channelRecord
+ .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX))
+ .getStringValue();
String resultsName =
((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX))
.getStringValue();
@@ -127,10 +128,15 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX))
.getStringValue();
+ String brokerSubscriptionsName = ((AString) channelRecord
+ .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX))
+ .getStringValue();
+
FunctionSignature signature = new FunctionSignature(functionSignature.get(0), functionSignature.get(1),
Integer.parseInt(functionSignature.get(2)));
- channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration,
+ channel = new Channel(dataverseName, channelName, channelSubscriptionsName, brokerSubscriptionsName,
+ resultsName, signature, duration,
dependencies, channelBody);
return channel;
}
@@ -164,9 +170,10 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
// write field 2
fieldValue.reset();
- aString.setValue(channel.getSubscriptionsDataset());
+ aString.setValue(channel.getChannelSubscriptionsDataset());
stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX, fieldValue);
+ recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_SUBSCRIPTIONS_NAME_FIELD_INDEX,
+ fieldValue);
// write field 3
fieldValue.reset();
@@ -227,6 +234,13 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX, fieldValue);
+ // write field 8
+ fieldValue.reset();
+ aString.setValue(channel.getBrokerSubscriptionsDataset());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BROKER_SUBSCRIPTIONS_NAME_FIELD_INDEX,
+ fieldValue);
+
// write record
recordBuilder.write(tupleBuilder.getDataOutput(), true);
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/e7a0717f/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
index 9ead7f0..82ca1f1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -76,25 +76,19 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
return false;
}
- boolean push = false;
- AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
- if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
- if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
- return false;
- }
+ boolean push = false;
+ AbstractLogicalOperator op = findOp(op1, 4, "");
+ if (op == null) {
push = true;
}
+
DataSourceScanOperator subscriptionsScan;
String channelDataverse;
String channelName;
if (!push) {
- DelegateOperator eOp = (DelegateOperator) op;
- if (!(eOp.getDelegate() instanceof CommitOperator)) {
- return false;
- }
- AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+ AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
return false;
}
@@ -113,15 +107,19 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
//Now we know that we are inserting into results
channelName = datasetName.substring(0, datasetName.length() - 7);
- String subscriptionsName = channelName + "Subscriptions";
- subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
+ String subscriptionsName = channelName + "BrokerSubscriptions";
+ subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, subscriptionsName);
if (subscriptionsScan == null) {
return false;
}
} else {
+ op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
+ return false;
+ }
//if push, get the channel name here instead
- subscriptionsScan = (DataSourceScanOperator) findOp(op, "");
+ subscriptionsScan = (DataSourceScanOperator) findOp(op, 3, "");
if (subscriptionsScan == null) {
return false;
}
@@ -133,13 +131,13 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
//Now we need to get the broker EndPoint
LogicalVariable brokerEndpointVar = context.newVar();
- AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
+ AbstractLogicalOperator opAboveBrokersScan = findOp(op, 1, "");
if (opAboveBrokersScan == null) {
return false;
}
//get subscriptionIdVar
- LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
+ LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(1);
//The channelExecutionTime is created just before the scan
ILogicalOperator channelExecutionAssign = subscriptionsScan.getInputs().get(0).getValue();
@@ -162,7 +160,7 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
context.computeAndSetTypeEnvironmentForOperator(op);
- ProjectOperator badProject = (ProjectOperator) findOp(op1, "project");
+ ProjectOperator badProject = (ProjectOperator) findOp(op1, 2, "");
badProject.getVariables().add(subscriptionIdVar);
badProject.getVariables().add(brokerEndpointVar);
badProject.getVariables().add(channelExecutionVar);
@@ -310,49 +308,65 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
}
/*This function is used to find specific operators within the plan, either
- * A. The brokers dataset scan
- * B. The subscriptions scan
- * C. The highest project of the plan
+ * 1. The brokers dataset scan
+ * 2. The highest project of the plan
+ * 3. The subscriptions scan
+ * 4. Commit operator
+ *
+ * param is the name of the channel when searching for a subscriptions scan
*/
- private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
+ private AbstractLogicalOperator findOp(AbstractLogicalOperator op, int searchId, String param) {
if (!op.hasInputs()) {
return null;
}
for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
- if (lookingForString.equals("brokers")) {
+ if (searchId == 1) {
if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
return op;
} else {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- lookingForString);
+ searchId, param);
if (nestedOp != null) {
return nestedOp;
}
}
- } else if (lookingForString.equals("project")) {
+ } else if (searchId == 2) {
if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
return (AbstractLogicalOperator) subOp.getValue();
} else {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- lookingForString);
+ searchId, param);
if (nestedOp != null) {
return nestedOp;
}
}
}
- else {
- if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
+ else if (searchId == 3) {
+ if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), param)) {
return (AbstractLogicalOperator) subOp.getValue();
} else {
AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- lookingForString);
+ searchId, param);
if (nestedOp != null) {
return nestedOp;
}
}
+ } else if (searchId == 4) {
+ if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+ DelegateOperator dOp = (DelegateOperator) subOp.getValue();
+ if (dOp.getDelegate() instanceof CommitOperator) {
+ return (AbstractLogicalOperator) subOp.getValue();
+ }
+ } else {
+ AbstractLogicalOperator nestedOp =
+ findOp((AbstractLogicalOperator) subOp.getValue(), searchId, param);
+ if (nestedOp != null) {
+ return nestedOp;
+ }
+ }
}
}
return null;
@@ -376,7 +390,7 @@ public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule
if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
- && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
+ && dds.getDataset().getItemTypeName().equals(BADConstants.BrokerSubscriptionsType)) {
if (subscriptionsName.equals("") || dds.getDataset().getDatasetName().equals(subscriptionsName)) {
return true;
}