You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2016/12/07 20:59:43 UTC
[5/7] asterixdb-bad git commit: Updated to match code changes to
asterix
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
new file mode 100644
index 0000000..527d65b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class DataverseBrokersSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+
+ public DataverseBrokersSearchKey(String dataverse) {
+ this.dataverse = dataverse;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
new file mode 100644
index 0000000..ffb3ab6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class DataverseChannelsSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+
+ public DataverseChannelsSearchKey(String dataverse) {
+ this.dataverse = dataverse;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
new file mode 100644
index 0000000..b64bf1b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.List;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+
+public class Procedure implements IExtensionMetadataEntity {
+ private static final long serialVersionUID = 1L;
+ public static final String LANGUAGE_AQL = "AQL";
+ public static final String LANGUAGE_JAVA = "JAVA";
+
+ public static final String RETURNTYPE_VOID = "VOID";
+ public static final String NOT_APPLICABLE = "N/A";
+
+ private final EntityId procedureId;
+ private final int arity;
+ private final List<String> params;
+ private final String body;
+ private final String returnType;
+ private final String language;
+
+ public Procedure(String dataverseName, String functionName, int arity, List<String> params, String returnType,
+ String functionBody, String language) {
+ this.procedureId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, functionName);
+ this.params = params;
+ this.body = functionBody;
+ this.returnType = returnType == null ? RETURNTYPE_VOID : returnType;
+ this.language = language;
+ this.arity = arity;
+ }
+
+ public EntityId getEntityId() {
+ return procedureId;
+ }
+
+ public List<String> getParams() {
+ return params;
+ }
+
+ public String getBody() {
+ return body;
+ }
+
+ public String getReturnType() {
+ return returnType;
+ }
+
+ public String getLanguage() {
+ return language;
+ }
+
+ public int getArity() {
+ return arity;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof Procedure)) {
+ return false;
+ }
+ Procedure otherDataset = (Procedure) other;
+ if (!otherDataset.procedureId.equals(procedureId)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
new file mode 100644
index 0000000..6456170
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ProcedureSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+ private final String channel;
+ private final String arity;
+
+ public ProcedureSearchKey(String dataverse, String channel, String arity) {
+ this.dataverse = dataverse;
+ this.channel = channel;
+ this.arity = arity;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse, channel, arity);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
new file mode 100644
index 0000000..f2eab9b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.IACursor;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Procedure metadata entity to an ITupleReference and vice versa.
+ */
+public class ProcedureTupleTranslator extends AbstractTupleTranslator<Procedure> {
+ // Field indexes of serialized Procedure in a tuple.
+ // First key field.
+ public static final int PROCEDURE_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
+ // Second key field.
+ public static final int PROCEDURE_PROCEDURE_NAME_TUPLE_FIELD_INDEX = 1;
+ // Third key field.
+ public static final int PROCEDURE_ARITY_TUPLE_FIELD_INDEX = 2;
+
+ // Payload field containing serialized Procedure.
+ public static final int PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX = 3;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE);
+
+ protected ProcedureTupleTranslator(boolean getTuple) {
+ super(getTuple, BADMetadataIndexes.NUM_FIELDS_PROCEDURE_IDX);
+ }
+
+ @Override
+ public Procedure getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
+ byte[] serRecord = frameTuple.getFieldData(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordStartOffset = frameTuple.getFieldStart(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordLength = frameTuple.getFieldLength(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+ ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+ DataInput in = new DataInputStream(stream);
+ ARecord procedureRecord = recordSerDes.deserialize(in);
+ return createProcedureFromARecord(procedureRecord);
+ }
+
+ private Procedure createProcedureFromARecord(ARecord procedureRecord) {
+ String dataverseName =
+ ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX))
+ .getStringValue();
+ String procedureName =
+ ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX))
+ .getStringValue();
+ String arity = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX)).getStringValue();
+
+ IACursor cursor = ((AOrderedList) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX)).getCursor();
+ List<String> params = new ArrayList<String>();
+ while (cursor.next()) {
+ params.add(((AString) cursor.get()).getStringValue());
+ }
+
+ String returnType = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX))
+ .getStringValue();
+
+ String definition = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX))
+ .getStringValue();
+
+ String language = ((AString) procedureRecord
+ .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX))
+ .getStringValue();
+
+ return new Procedure(dataverseName, procedureName, Integer.parseInt(arity), params, returnType, definition,
+ language);
+
+ }
+
+ @Override
+ public ITupleReference getTupleFromMetadataEntity(Procedure procedure) throws IOException, MetadataException {
+ // write the key in the first 2 fields of the tuple
+ tupleBuilder.reset();
+ aString.setValue(procedure.getEntityId().getDataverse());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+ aString.setValue(procedure.getEntityId().getEntityName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+ aString.setValue(procedure.getArity() + "");
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ // write the pay-load in the fourth field of the tuple
+
+ recordBuilder.reset(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(procedure.getEntityId().getDataverse());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX, fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(procedure.getEntityId().getEntityName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 2
+ fieldValue.reset();
+ aString.setValue(procedure.getArity() + "");
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX, fieldValue);
+
+ // write field 3
+ OrderedListBuilder listBuilder = new OrderedListBuilder();
+ ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
+ listBuilder.reset((AOrderedListType) BADMetadataRecordTypes.PROCEDURE_RECORDTYPE
+ .getFieldTypes()[BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX]);
+ for (String param : procedure.getParams()) {
+ itemValue.reset();
+ aString.setValue(param);
+ stringSerde.serialize(aString, itemValue.getDataOutput());
+ listBuilder.addItem(itemValue);
+ }
+ fieldValue.reset();
+ listBuilder.write(fieldValue.getDataOutput(), true);
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX, fieldValue);
+
+ // write field 4
+ fieldValue.reset();
+ aString.setValue(procedure.getReturnType());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX, fieldValue);
+
+ // write field 5
+ fieldValue.reset();
+ aString.setValue(procedure.getBody());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX, fieldValue);
+
+ // write field 6
+ fieldValue.reset();
+ aString.setValue(procedure.getLanguage());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX, fieldValue);
+
+ // write record
+ recordBuilder.write(tupleBuilder.getDataOutput(), true);
+ tupleBuilder.addFieldEndOffset();
+
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ return tuple;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
new file mode 100644
index 0000000..89f0d20
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.operators.CommitOperator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.runtime.NotifyBrokerOperator;
+import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+ if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
+ return false;
+ }
+ AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
+ return false;
+ }
+ DelegateOperator eOp = (DelegateOperator) op;
+ if (!(eOp.getDelegate() instanceof CommitOperator)) {
+ return false;
+ }
+ AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+ if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+ return false;
+ }
+ InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
+ if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
+ return false;
+ }
+ DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
+ String datasetName = dds.getDataset().getDatasetName();
+ if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+ || !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
+ || !datasetName.endsWith("Results")) {
+ return false;
+ }
+ String channelDataverse = dds.getDataset().getDataverseName();
+ //Now we know that we are inserting into results
+
+ String channelName = datasetName.substring(0, datasetName.length() - 7);
+ String subscriptionsName = channelName + "Subscriptions";
+ //TODO: Can we check here to see if there is a channel with such a name?
+
+ DataSourceScanOperator subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
+ if (subscriptionsScan == null) {
+ return false;
+ }
+
+ //Now we want to make sure and set the commit to be a nonsink commit
+ ((CommitOperator) eOp.getDelegate()).setSink(false);
+
+ //Now we need to get the broker EndPoint
+ LogicalVariable brokerEndpointVar = context.newVar();
+ AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
+ AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
+ //now brokerNameVar holds the brokerName for use farther up in the plan
+
+ context.computeAndSetTypeEnvironmentForOperator(assignOp);
+ context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
+ context.computeAndSetTypeEnvironmentForOperator(eOp);
+
+ //get subscriptionIdVar
+ LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
+
+ //The channelExecutionTime is created just before the scan
+ LogicalVariable channelExecutionVar = ((AssignOperator) subscriptionsScan.getInputs().get(0).getValue())
+ .getVariables().get(0);
+
+ ProjectOperator badProject = (ProjectOperator) findOp(op, "project");
+ badProject.getVariables().add(subscriptionIdVar);
+ badProject.getVariables().add(brokerEndpointVar);
+ badProject.getVariables().add(channelExecutionVar);
+ context.computeAndSetTypeEnvironmentForOperator(badProject);
+
+ //Create my brokerNotify plan above the extension Operator
+ DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar,
+ context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName);
+
+ opRef.setValue(dOp);
+
+ return true;
+ }
+
+ private DelegateOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar,
+ LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext context,
+ ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName)
+ throws AlgebricksException {
+ //create the Distinct Op
+ ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
+ VariableReferenceExpression vExpr = new VariableReferenceExpression(subscriptionIdVar);
+ expressions.add(new MutableObject<ILogicalExpression>(vExpr));
+ DistinctOperator distinctOp = new DistinctOperator(expressions);
+
+ //create the GroupBy Op
+ //And set the distinct as input
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ List<ILogicalPlan> nestedPlans = new ArrayList<ILogicalPlan>();
+
+ //create group by operator
+ GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
+ groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
+ groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
+ groupbyOp.getInputs().add(new MutableObject<ILogicalOperator>(distinctOp));
+
+ //create nested plan for subscription ids in group by
+ NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(
+ new MutableObject<ILogicalOperator>(groupbyOp));
+ //TODO: This is from translationcontext. It might be needed to make the variable exist outside of the subplan
+ //LogicalVariable subscriptionListVar = context.newSubplanOutputVar();
+ LogicalVariable subscriptionListVar = context.newVar();
+ List<LogicalVariable> aggVars = new ArrayList<LogicalVariable>();
+ aggVars.add(subscriptionListVar);
+ AggregateFunctionCallExpression funAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(
+ AsterixBuiltinFunctions.LISTIFY, new ArrayList<Mutable<ILogicalExpression>>());
+ funAgg.getArguments()
+ .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(subscriptionIdVar)));
+ List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ aggExpressions.add(new MutableObject<ILogicalExpression>(funAgg));
+ AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions);
+ listifyOp.getInputs().add(new MutableObject<ILogicalOperator>(nestedTupleSourceOp));
+
+ //add nested plans
+ nestedPlans.add(new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(listifyOp)));
+
+ //Create the NotifyBrokerOperator
+ NotifyBrokerOperator notifyBrokerOp = new NotifyBrokerOperator(brokerEndpointVar, subscriptionListVar,
+ channelExecutionVar);
+ EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
+ NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
+ notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
+ DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
+ extensionOp.setPhysicalOperator(notifyBrokerPOp);
+ extensionOp.getInputs().add(new MutableObject<ILogicalOperator>(groupbyOp));
+
+ //Set the input for the brokerNotify as the replicate operator
+ distinctOp.getInputs().add(new MutableObject<ILogicalOperator>(eOp));
+
+ //compute environment bottom up
+
+ context.computeAndSetTypeEnvironmentForOperator(distinctOp);
+ context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
+ context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp);
+ context.computeAndSetTypeEnvironmentForOperator(listifyOp);
+ context.computeAndSetTypeEnvironmentForOperator(extensionOp);
+
+ return extensionOp;
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
+ AbstractLogicalOperator opAboveBrokersScan) {
+ Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
+ new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint))));
+ DataSourceScanOperator brokerScan = null;
+ int index = 0;
+ for (Mutable<ILogicalOperator> subOp : opAboveBrokersScan.getInputs()) {
+ if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+ brokerScan = (DataSourceScanOperator) subOp.getValue();
+ break;
+ }
+ index++;
+ }
+ Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
+ new VariableReferenceExpression(brokerScan.getVariables().get(2)));
+
+ ScalarFunctionCallExpression fieldAccessByName = new ScalarFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
+ ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(1);
+ varArray.add(brokerEndpointVar);
+ ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(1);
+ exprArray.add(new MutableObject<ILogicalExpression>(fieldAccessByName));
+
+ AssignOperator assignOp = new AssignOperator(varArray, exprArray);
+
+ //Place assignOp between the scan and the op above it
+ assignOp.getInputs().add(new MutableObject<ILogicalOperator>(brokerScan));
+ opAboveBrokersScan.getInputs().set(index, new MutableObject<ILogicalOperator>(assignOp));
+
+ return assignOp;
+ }
+
+ /*This function searches for the needed op
+ * If lookingForBrokers, find the op above the brokers scan
+ * Else find the suscbriptionsScan
+ */
+ private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
+ if (!op.hasInputs()) {
+ return null;
+ }
+ for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
+ if (lookingForString.equals("brokers")) {
+ if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+ return op;
+ } else {
+ AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+ lookingForString);
+ if (nestedOp != null) {
+ return nestedOp;
+ }
+ }
+
+ } else if (lookingForString.equals("project")) {
+ if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
+ return (AbstractLogicalOperator) subOp.getValue();
+ } else {
+ AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+ lookingForString);
+ if (nestedOp != null) {
+ return nestedOp;
+ }
+ }
+ }
+
+ else {
+ if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
+ return (AbstractLogicalOperator) subOp.getValue();
+ } else {
+ AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+ lookingForString);
+ if (nestedOp != null) {
+ return nestedOp;
+ }
+ }
+
+ }
+ }
+ return null;
+ }
+
+ private boolean isBrokerScan(AbstractLogicalOperator op) {
+ if (op instanceof DataSourceScanOperator) {
+ if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
+ DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
+ if (dds.getDataset().getDataverseName().equals("Metadata")
+ && dds.getDataset().getDatasetName().equals("Broker")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName) {
+ if (op instanceof DataSourceScanOperator) {
+ if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
+ DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
+ if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+ && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
+ if (dds.getDataset().getDatasetName().equals(subscriptionsName)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
new file mode 100644
index 0000000..d281b49
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.Collection;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ */
+public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator {
+ private final LogicalVariable subscriptionIdVar;
+ private final LogicalVariable brokerEndpointVar;
+ private final LogicalVariable channelExecutionVar;
+
+ public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable subscriptionIdVar,
+ LogicalVariable resultSetVar) {
+ this.brokerEndpointVar = brokerEndpointVar;
+ this.subscriptionIdVar = subscriptionIdVar;
+ this.channelExecutionVar = resultSetVar;
+ }
+
+ public LogicalVariable getSubscriptionVariable() {
+ return subscriptionIdVar;
+ }
+
+ public LogicalVariable getBrokerEndpointVariable() {
+ return brokerEndpointVar;
+ }
+
+ public LogicalVariable getChannelExecutionVariable() {
+ return channelExecutionVar;
+ }
+
+ @Override
+ public String toString() {
+ return "notify-brokers";
+ }
+
+ @Override
+ public boolean isMap() {
+ return false;
+ }
+
+ @Override
+ public IOperatorDelegate newInstance() {
+ return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar);
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+ throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public void getUsedVariables(Collection<LogicalVariable> usedVars) {
+ usedVars.add(subscriptionIdVar);
+ usedVars.add(brokerEndpointVar);
+ usedVars.add(channelExecutionVar);
+ }
+
+ @Override
+ public void getProducedVariables(Collection<LogicalVariable> producedVars) {
+ // none produced
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
new file mode 100644
index 0000000..12d5ae2
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class NotifyBrokerPOperator extends AbstractPhysicalOperator {
+
+ private final EntityId entityId;
+
+ public NotifyBrokerPOperator(EntityId entityId) {
+ this.entityId = entityId;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.DELEGATE_OPERATOR;
+ }
+
+ @Override
+ public String toString() {
+ return "NOTIFY_BROKERS";
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ DelegateOperator notify = (DelegateOperator) op;
+ LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable();
+ LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
+ LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
+
+ int brokerColumn = inputSchemas[0].findVariable(brokerVar);
+ int subColumn = inputSchemas[0].findVariable(subVar);
+ int executionColumn = inputSchemas[0].findVariable(executionVar);
+
+ IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn);
+ IScalarEvaluatorFactory subEvalFactory = new ColumnAccessEvalFactory(subColumn);
+ IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn);
+
+ NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, subEvalFactory,
+ channelExecutionEvalFactory, entityId);
+
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+ context);
+
+ builder.contributeMicroOperator(op, runtime, recDesc);
+
+ // and contribute one edge from its child
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, notify, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
new file mode 100644
index 0000000..8634e4c
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+ private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private final DataInputStream di = new DataInputStream(bbis);
+ private final AOrderedListSerializerDeserializer subSerDes = new AOrderedListSerializerDeserializer(
+ new AOrderedListType(BuiltinType.AUUID, null));
+
+ private IPointable inputArg0 = new VoidPointable();
+ private IPointable inputArg1 = new VoidPointable();
+ private IPointable inputArg2 = new VoidPointable();
+ private IScalarEvaluator eval0;
+ private IScalarEvaluator eval1;
+ private IScalarEvaluator eval2;
+ private final ActiveManager activeManager;
+ private final EntityId entityId;
+
+ public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
+ IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
+ EntityId activeJobId) throws HyracksDataException {
+ this.tRef = new FrameTupleReference();
+ eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
+ eval1 = subEvalFactory.createScalarEvaluator(ctx);
+ eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
+ this.activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+ .getApplicationObject()).getActiveManager();
+ this.entityId = activeJobId;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ return;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+
+ eval0.evaluate(tRef, inputArg0);
+ eval1.evaluate(tRef, inputArg1);
+ eval2.evaluate(tRef, inputArg2);
+
+ int serBrokerOffset = inputArg0.getStartOffset();
+ bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
+ AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di);
+
+ int serSubOffset = inputArg1.getStartOffset();
+ bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
+ AOrderedList subs = subSerDes.deserialize(di);
+
+ int resultSetOffset = inputArg2.getStartOffset();
+ bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
+ ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
+ String executionTimeString;
+ try {
+ executionTimeString = executionTime.toSimpleString();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ ChannelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs,
+ executionTimeString);
+
+ }
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ return;
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ this.inputRecordDesc = recordDescriptor;
+ this.tAccess = new FrameTupleAccessor(inputRecordDesc);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ return;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ failed = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
new file mode 100644
index 0000000..d5d05cf
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NotifyBrokerRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IScalarEvaluatorFactory brokerEvalFactory;
+ private final IScalarEvaluatorFactory subEvalFactory;
+ private final IScalarEvaluatorFactory channelExecutionEvalFactory;
+ private final EntityId entityId;
+
+ public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory, IScalarEvaluatorFactory subEvalFactory,
+ IScalarEvaluatorFactory channelExecutionEvalFactory, EntityId entityId) {
+ this.brokerEvalFactory = brokerEvalFactory;
+ this.subEvalFactory = subEvalFactory;
+ this.channelExecutionEvalFactory = channelExecutionEvalFactory;
+ this.entityId = entityId;
+ }
+
+ @Override
+ public String toString() {
+ return "notify-broker";
+ }
+
+ @Override
+ public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new NotifyBrokerRuntime(ctx, brokerEvalFactory, subEvalFactory, channelExecutionEvalFactory, entityId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
new file mode 100644
index 0000000..8093977
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ */
+public class RepetitiveChannelOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorDescriptor.class.getName());
+
+ /** The unique identifier of the job. **/
+ protected final EntityId entityId;
+
+ protected final JobSpecification jobSpec;
+
+ private final String duration;
+
+ private String strIP;
+ private int port;
+
+ public RepetitiveChannelOperatorDescriptor(JobSpecification spec, String dataverseName, String channelName,
+ String duration, JobSpecification channeljobSpec, String strIP, int port) {
+ super(spec, 0, 0);
+ this.entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
+ this.jobSpec = channeljobSpec;
+ this.duration = duration;
+ this.strIP = strIP;
+ this.port = port;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ ActiveRuntimeId runtimeId = new ActiveRuntimeId(entityId,
+ RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition);
+ return new RepetitiveChannelOperatorNodePushable(ctx, runtimeId, jobSpec, duration, strIP, port);
+ }
+
+ public String getDuration() {
+ return duration;
+ }
+
+ public EntityId getEntityId() {
+ return entityId;
+ }
+
+ public JobSpecification getJobSpec() {
+ return jobSpec;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
new file mode 100644
index 0000000..1bbe331
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.EnumSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class RepetitiveChannelOperatorNodePushable extends ActiveSourceOperatorNodePushable {
+
+ private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorNodePushable.class.getName());
+
+ private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ private final JobSpecification jobSpec;
+ private long duration;
+ private final HyracksConnection hcc;
+
+ public RepetitiveChannelOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId,
+ JobSpecification channeljobSpec, String duration, String strIP, int port) throws HyracksDataException {
+ super(ctx, runtimeId);
+ this.jobSpec = channeljobSpec;
+ this.duration = ChannelJobService.findPeriod(duration);
+ try {
+ hcc = new HyracksConnection(strIP, port);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+
+ @Override
+ protected void start() throws HyracksDataException, InterruptedException {
+ try {
+ scheduledExecutorService =
+ ChannelJobService.startJob(jobSpec, EnumSet.noneOf(JobFlag.class), null, hcc, duration);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ while (!scheduledExecutorService.isTerminated()) {
+
+ }
+
+ }
+
+ @Override
+ protected void abort() throws HyracksDataException, InterruptedException {
+ scheduledExecutorService.shutdown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/resources/lang-extension/lang.txt
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
new file mode 100644
index 0000000..94b4c78
--- /dev/null
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -0,0 +1,206 @@
+import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelSubscribeStatement;
+import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
+import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
+import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateProcedureStatement;
+
+
+@merge
+Statement SingleStatement() throws ParseException:
+{
+ // merge area 1
+ before:
+ after:
+}
+{
+ (
+ // merge area 2
+ before:
+ after: | stmt = ChannelSubscriptionStatement())
+ {
+ // merge area 3
+ }
+}
+
+@merge
+Statement CreateStatement() throws ParseException:
+{
+ // merge area 1
+ before:
+ after:
+}
+{
+ (
+ // merge area 2
+ before:
+ after: | stmt = ChannelSpecification() | stmt = BrokerSpecification() | stmt = ProcedureSpecification())
+ {
+ // merge area 3
+ }
+}
+
+@merge
+Statement DropStatement() throws ParseException:
+{
+ // merge area 1
+ before:
+ after:
+}
+{
+ (
+ // merge area 2
+ before:
+ after: | "channel" pairId = QualifiedName() ifExists = IfExists()
+ {
+ stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists);
+ }
+ | "broker" pairId = QualifiedName() ifExists = IfExists()
+ {
+ stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);
+ }
+ )
+ {
+ // merge area 3
+ }
+}
+
+@new
+CreateChannelStatement ChannelSpecification() throws ParseException:
+{
+ Pair<Identifier,Identifier> nameComponents = null;
+ FunctionSignature appliedFunction = null;
+ CreateChannelStatement ccs = null;
+ String fqFunctionName = null;
+ Expression period = null;
+ boolean distributed = false;
+}
+{
+ (
+ "repetitive" "channel" nameComponents = QualifiedName()
+ <USING> appliedFunction = FunctionSignature()
+ "period" period = FunctionCallExpr() ("distributed" { distributed = true; })?
+ {
+ ccs = new CreateChannelStatement(nameComponents.first,
+ nameComponents.second, appliedFunction, period, distributed);
+ }
+ )
+ {
+ return ccs;
+ }
+}
+
+
+@new
+CreateProcedureStatement ProcedureSpecification() throws ParseException:
+{
+ Pair<Identifier,Identifier> nameComponents = null;
+ FunctionSignature signature;
+ List<VarIdentifier> paramList = new ArrayList<VarIdentifier>();
+ String functionBody;
+ Token beginPos;
+ Token endPos;
+ Expression functionBodyExpr;
+}
+{
+ "procedure" nameComponents = QualifiedName()
+ paramList = ParameterList()
+ <LEFTBRACE>
+ {
+ beginPos = token;
+ }
+ functionBodyExpr = Expression() <RIGHTBRACE>
+ {
+ endPos = token;
+ functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn);
+ signature = new FunctionSignature(nameComponents.first.toString(), nameComponents.second.toString(), paramList.size());
+ removeCurrentScope();
+ return new CreateProcedureStatement(signature, paramList, functionBody);
+ }
+}
+
+
+
+
+@new
+CreateBrokerStatement BrokerSpecification() throws ParseException:
+{
+ CreateBrokerStatement cbs = null;
+ Pair<Identifier,Identifier> name = null;
+ String endPoint = null;
+}
+{
+ (
+ "broker" name = QualifiedName()
+ <AT> endPoint = StringLiteral()
+ {
+ cbs = new CreateBrokerStatement(name.first, name.second,endPoint);
+ }
+ )
+ {
+ return cbs;
+ }
+}
+
+@new
+Statement ChannelSubscriptionStatement() throws ParseException:
+{
+ Statement stmt = null;
+ Pair<Identifier,Identifier> nameComponents = null;
+ List<Expression> argList = new ArrayList<Expression>();
+ Expression tmp = null;
+ String id = null;
+ String subscriptionId = null;
+ Pair<Identifier,Identifier> brokerName = null;
+}
+{
+ (
+ "subscribe" <TO> nameComponents = QualifiedName()
+ <LEFTPAREN> (tmp = Expression()
+ {
+ argList.add(tmp);
+ }
+ (<COMMA> tmp = Expression()
+ {
+ argList.add(tmp);
+ }
+ )*)? <RIGHTPAREN> <ON> brokerName = QualifiedName()
+ {
+ stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+ }
+ | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
+ {
+ setDataverses(new ArrayList<String>());
+ setDatasets(new ArrayList<String>());
+ VariableExpr varExp = new VariableExpr();
+ VarIdentifier var = new VarIdentifier();
+ varExp.setVar(var);
+ var.setValue("$subscriptionPlaceholder");
+ getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
+ List<String> dataverses = getDataverses();
+ List<String> datasets = getDatasets();
+ // we remove the pointer to the dataverses and datasets
+ setDataverses(null);
+ setDatasets(null);
+ stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter(), dataverses, datasets);
+ }
+ | "change" "subscription" subscriptionId = StringLiteral() <ON> nameComponents = QualifiedName()
+ <LEFTPAREN> (tmp = Expression()
+ {
+ argList.add(tmp);
+ }
+ (<COMMA> tmp = Expression()
+ {
+ argList.add(tmp);
+ }
+ )*)? <RIGHTPAREN>
+ <TO> brokerName = QualifiedName()
+ {
+ stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+ }
+ )
+ {
+ return stmt;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
new file mode 100644
index 0000000..77e8afe
--- /dev/null
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.TestGroup;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the runtime test cases under 'src/test/resources/runtimets'.
+ */
+@RunWith(Parameterized.class)
+public class BADExecutionTest {
+
+ protected static final Logger LOGGER = Logger.getLogger(BADExecutionTest.class.getName());
+
+ protected static final String PATH_ACTUAL = "target/rttest" + File.separator;
+ protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
+ File.separator);
+
+ protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+
+ protected static AsterixTransactionProperties txnProperties;
+ private static final TestExecutor testExecutor = new TestExecutor();
+ private static final boolean cleanupOnStart = true;
+ private static final boolean cleanupOnStop = true;
+
+ protected static TestGroup FailedGroup;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ File outdir = new File(PATH_ACTUAL);
+ outdir.mkdirs();
+ ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ ExecutionTestUtil.tearDown(cleanupOnStop);
+ ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
+ }
+
+ @Parameters(name = "BADExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return buildTestsInXml("testsuite.xml");
+ }
+
+ protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+ Collection<Object[]> testArgs = new ArrayList<Object[]>();
+ TestCaseContext.Builder b = new TestCaseContext.Builder();
+ for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+ testArgs.add(new Object[] { ctx });
+ }
+ return testArgs;
+
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public BADExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, FailedGroup);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
new file mode 100644
index 0000000..ad2f1bf
--- /dev/null
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import java.io.File;
+import java.util.logging.Logger;
+
+import org.apache.asterix.bad.lang.BADCompilationProvider;
+import org.apache.asterix.bad.lang.BADQueryTranslatorFactory;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.IdentitiyResolverFactory;
+import org.apache.asterix.test.optimizer.OptimizerTest;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class BADOptimizerTest extends OptimizerTest {
+
+ private static final Logger LOGGER = Logger.getLogger(BADOptimizerTest.class.getName());
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+ final File outdir = new File(PATH_ACTUAL);
+ outdir.mkdirs();
+
+ extensionLangCompilationProvider = new BADCompilationProvider();
+ statementExecutorFactory = new BADQueryTranslatorFactory();
+
+ integrationUtil.init(true);
+ // Set the node resolver to be the identity resolver that expects node names
+ // to be node controller ids; a valid assumption in test environment.
+ System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
+ IdentitiyResolverFactory.class.getName());
+ }
+
+ public BADOptimizerTest(File queryFile, File expectedFile, File actualFile) {
+ super(queryFile, expectedFile, actualFile);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml b/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
new file mode 100644
index 0000000..c2f5d41
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
@@ -0,0 +1,110 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+ <metadataNode>asterix_nc1</metadataNode>
+ <store>
+ <ncId>asterix_nc1</ncId>
+ <storeDirs>iodevice0,iodevice1</storeDirs>
+ </store>
+ <store>
+ <ncId>asterix_nc2</ncId>
+ <storeDirs>iodevice0,iodevice1</storeDirs>
+ </store>
+ <transactionLogDir>
+ <ncId>asterix_nc1</ncId>
+ <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
+ </transactionLogDir>
+ <transactionLogDir>
+ <ncId>asterix_nc2</ncId>
+ <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
+ </transactionLogDir>
+ <extensions>
+ <extension>
+ <extensionClassName>org.apache.asterix.bad.lang.BADQueryTranslatorExtension</extensionClassName>
+ </extension>
+ <extension>
+ <extensionClassName>org.apache.asterix.bad.lang.BADLangExtension</extensionClassName>
+ </extension>
+ <extension>
+ <extensionClassName>org.apache.asterix.bad.metadata.BADMetadataExtension</extensionClassName>
+ </extension>
+ </extensions>
+ <property>
+ <name>max.wait.active.cluster</name>
+ <value>60</value>
+ <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
+ nodes are available)
+ before a submitted query/statement can be
+ executed. (Default = 60 seconds)
+ </description>
+ </property>
+ <property>
+ <name>log.level</name>
+ <value>WARNING</value>
+ <description>Log level for running tests/build</description>
+ </property>
+ <property>
+ <name>compiler.framesize</name>
+ <value>32768</value>
+ </property>
+ <property>
+ <name>compiler.sortmemory</name>
+ <value>327680</value>
+ </property>
+ <property>
+ <name>compiler.groupmemory</name>
+ <value>163840</value>
+ </property>
+ <property>
+ <name>compiler.joinmemory</name>
+ <value>163840</value>
+ </property>
+ <property>
+ <name>compiler.pregelix.home</name>
+ <value>~/pregelix</value>
+ </property>
+ <property>
+ <name>storage.buffercache.pagesize</name>
+ <value>32768</value>
+ <description>The page size in bytes for pages in the buffer cache.
+ (Default = "32768" // 32KB)
+ </description>
+ </property>
+ <property>
+ <name>storage.buffercache.size</name>
+ <value>33554432</value>
+ <description>The size of memory allocated to the disk buffer cache.
+ The value should be a multiple of the buffer cache page size(Default
+ = "33554432" // 32MB)
+ </description>
+ </property>
+ <property>
+ <name>storage.memorycomponent.numpages</name>
+ <value>8</value>
+ <description>The number of pages to allocate for a memory component.
+ (Default = 8)
+ </description>
+ </property>
+ <property>
+ <name>plot.activate</name>
+ <value>false</value>
+ <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+ </description>
+ </property>
+</asterixConfiguration>
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/cluster.xml
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/conf/cluster.xml b/asterix-bad/src/test/resources/conf/cluster.xml
new file mode 100644
index 0000000..8f0b694
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/cluster.xml
@@ -0,0 +1,49 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+ <instance_name>asterix</instance_name>
+ <store>storage</store>
+
+ <data_replication>
+ <enabled>false</enabled>
+ <replication_port>2016</replication_port>
+ <replication_factor>2</replication_factor>
+ <auto_failover>false</auto_failover>
+ <replication_time_out>30</replication_time_out>
+ </data_replication>
+
+ <master_node>
+ <id>master</id>
+ <client_ip>127.0.0.1</client_ip>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <client_port>1098</client_port>
+ <cluster_port>1099</cluster_port>
+ <http_port>8888</http_port>
+ </master_node>
+ <node>
+ <id>nc1</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <replication_port>2016</replication_port>
+ </node>
+ <node>
+ <id>nc2</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <replication_port>2017</replication_port>
+ </node>
+</cluster>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/hyracks-deployment.properties
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/conf/hyracks-deployment.properties b/asterix-bad/src/test/resources/conf/hyracks-deployment.properties
new file mode 100644
index 0000000..17a6772
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/hyracks-deployment.properties
@@ -0,0 +1,21 @@
+#/*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+cc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.CCBootstrapImpl
+nc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.NCBootstrapImpl
+cc.ip=127.0.0.1
+cc.port=1098
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/test.properties
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/conf/test.properties b/asterix-bad/src/test/resources/conf/test.properties
new file mode 100644
index 0000000..86269c8
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/test.properties
@@ -0,0 +1,22 @@
+#/*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+MetadataNode=nc1
+NewUniverse=true
+nc1.stores=nc1data
+nc2.stores=nc2data
+OutputDir=/tmp/asterix_output/
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql
new file mode 100644
index 0000000..4dc9291
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql
@@ -0,0 +1,36 @@
+/*
+ * Description : Check the Plan used by a channel
+ * Expected Res : Success
+ * Date : Mar 2015
+ */
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+
+create type TweetMessageTypeuuid as closed {
+ tweetid: uuid,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ countA: int32,
+ countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+ for $tweet in dataset TweetMessageuuids
+ let $circle := create-circle($location,30.0)
+ where contains($tweet.message-text,$text)
+ and spatial-intersect($tweet.sender-location, $location)
+ return $tweet.message-text
+};
+
+write output to nc1:"rttest/channel-create.adm";
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file