You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2016/12/07 20:59:43 UTC

[5/7] asterixdb-bad git commit: Updated to match code changes to asterix

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
new file mode 100644
index 0000000..527d65b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class DataverseBrokersSearchKey implements IExtensionMetadataSearchKey {
+    private static final long serialVersionUID = 1L;
+    private final String dataverse;
+
+    public DataverseBrokersSearchKey(String dataverse) {
+        this.dataverse = dataverse;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+    }
+
+    @Override
+    public ITupleReference getSearchKey() {
+        return MetadataNode.createTuple(dataverse);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
new file mode 100644
index 0000000..ffb3ab6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class DataverseChannelsSearchKey implements IExtensionMetadataSearchKey {
+    private static final long serialVersionUID = 1L;
+    private final String dataverse;
+
+    public DataverseChannelsSearchKey(String dataverse) {
+        this.dataverse = dataverse;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+    }
+
+    @Override
+    public ITupleReference getSearchKey() {
+        return MetadataNode.createTuple(dataverse);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
new file mode 100644
index 0000000..b64bf1b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.List;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+
+public class Procedure implements IExtensionMetadataEntity {
+    private static final long serialVersionUID = 1L;
+    public static final String LANGUAGE_AQL = "AQL";
+    public static final String LANGUAGE_JAVA = "JAVA";
+
+    public static final String RETURNTYPE_VOID = "VOID";
+    public static final String NOT_APPLICABLE = "N/A";
+
+    private final EntityId procedureId;
+    private final int arity;
+    private final List<String> params;
+    private final String body;
+    private final String returnType;
+    private final String language;
+
+    public Procedure(String dataverseName, String functionName, int arity, List<String> params, String returnType,
+            String functionBody, String language) {
+        this.procedureId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, functionName);
+        this.params = params;
+        this.body = functionBody;
+        this.returnType = returnType == null ? RETURNTYPE_VOID : returnType;
+        this.language = language;
+        this.arity = arity;
+    }
+
+    public EntityId getEntityId() {
+        return procedureId;
+    }
+
+    public List<String> getParams() {
+        return params;
+    }
+
+    public String getBody() {
+        return body;
+    }
+
+    public String getReturnType() {
+        return returnType;
+    }
+
+    public String getLanguage() {
+        return language;
+    }
+
+    public int getArity() {
+        return arity;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof Procedure)) {
+            return false;
+        }
+        Procedure otherDataset = (Procedure) other;
+        if (!otherDataset.procedureId.equals(procedureId)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
new file mode 100644
index 0000000..6456170
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ProcedureSearchKey implements IExtensionMetadataSearchKey {
+    private static final long serialVersionUID = 1L;
+    private final String dataverse;
+    private final String channel;
+    private final String arity;
+
+    public ProcedureSearchKey(String dataverse, String channel, String arity) {
+        this.dataverse = dataverse;
+        this.channel = channel;
+        this.arity = arity;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID;
+    }
+
+    @Override
+    public ITupleReference getSearchKey() {
+        return MetadataNode.createTuple(dataverse, channel, arity);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
new file mode 100644
index 0000000..f2eab9b
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.IACursor;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Procedure metadata entity to an ITupleReference and vice versa.
+ */
+public class ProcedureTupleTranslator extends AbstractTupleTranslator<Procedure> {
+    // Field indexes of serialized Procedure in a tuple.
+    // First key field.
+    public static final int PROCEDURE_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
+    // Second key field.
+    public static final int PROCEDURE_PROCEDURE_NAME_TUPLE_FIELD_INDEX = 1;
+    // Third key field.
+    public static final int PROCEDURE_ARITY_TUPLE_FIELD_INDEX = 2;
+
+    // Payload field containing serialized Procedure.
+    public static final int PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX = 3;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE);
+
+    protected ProcedureTupleTranslator(boolean getTuple) {
+        super(getTuple, BADMetadataIndexes.NUM_FIELDS_PROCEDURE_IDX);
+    }
+
+    @Override
+    public Procedure getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
+        byte[] serRecord = frameTuple.getFieldData(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordStartOffset = frameTuple.getFieldStart(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordLength = frameTuple.getFieldLength(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX);
+        ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+        DataInput in = new DataInputStream(stream);
+        ARecord procedureRecord = recordSerDes.deserialize(in);
+        return createProcedureFromARecord(procedureRecord);
+    }
+
+    private Procedure createProcedureFromARecord(ARecord procedureRecord) {
+        String dataverseName =
+ ((AString) procedureRecord
+                .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX))
+                        .getStringValue();
+        String procedureName =
+ ((AString) procedureRecord
+                .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX))
+                        .getStringValue();
+        String arity = ((AString) procedureRecord
+                .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX)).getStringValue();
+
+        IACursor cursor = ((AOrderedList) procedureRecord
+                .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX)).getCursor();
+        List<String> params = new ArrayList<String>();
+        while (cursor.next()) {
+            params.add(((AString) cursor.get()).getStringValue());
+        }
+
+        String returnType = ((AString) procedureRecord
+                .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX))
+                        .getStringValue();
+
+        String definition = ((AString) procedureRecord
+                .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX))
+                        .getStringValue();
+
+        String language = ((AString) procedureRecord
+                .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX))
+                        .getStringValue();
+
+        return new Procedure(dataverseName, procedureName, Integer.parseInt(arity), params, returnType, definition,
+                language);
+
+    }
+
+    @Override
+    public ITupleReference getTupleFromMetadataEntity(Procedure procedure) throws IOException, MetadataException {
+        // write the key in the first 2 fields of the tuple
+        tupleBuilder.reset();
+        aString.setValue(procedure.getEntityId().getDataverse());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+        aString.setValue(procedure.getEntityId().getEntityName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+        aString.setValue(procedure.getArity() + "");
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        // write the pay-load in the fourth field of the tuple
+
+        recordBuilder.reset(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE);
+
+        // write field 0
+        fieldValue.reset();
+        aString.setValue(procedure.getEntityId().getDataverse());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX, fieldValue);
+
+        // write field 1
+        fieldValue.reset();
+        aString.setValue(procedure.getEntityId().getEntityName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 2
+        fieldValue.reset();
+        aString.setValue(procedure.getArity() + "");
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX, fieldValue);
+
+        // write field 3
+        OrderedListBuilder listBuilder = new OrderedListBuilder();
+        ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
+        listBuilder.reset((AOrderedListType) BADMetadataRecordTypes.PROCEDURE_RECORDTYPE
+                .getFieldTypes()[BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX]);
+        for (String param : procedure.getParams()) {
+            itemValue.reset();
+            aString.setValue(param);
+            stringSerde.serialize(aString, itemValue.getDataOutput());
+            listBuilder.addItem(itemValue);
+        }
+        fieldValue.reset();
+        listBuilder.write(fieldValue.getDataOutput(), true);
+        recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX, fieldValue);
+
+        // write field 4
+        fieldValue.reset();
+        aString.setValue(procedure.getReturnType());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX, fieldValue);
+
+        // write field 5
+        fieldValue.reset();
+        aString.setValue(procedure.getBody());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX, fieldValue);
+
+        // write field 6
+        fieldValue.reset();
+        aString.setValue(procedure.getLanguage());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX, fieldValue);
+
+        // write record
+        recordBuilder.write(tupleBuilder.getDataOutput(), true);
+        tupleBuilder.addFieldEndOffset();
+
+        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+        return tuple;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
new file mode 100644
index 0000000..89f0d20
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.operators.CommitOperator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.runtime.NotifyBrokerOperator;
+import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
+            return false;
+        }
+        AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
+            return false;
+        }
+        DelegateOperator eOp = (DelegateOperator) op;
+        if (!(eOp.getDelegate() instanceof CommitOperator)) {
+            return false;
+        }
+        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+        if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+            return false;
+        }
+        InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
+        if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
+            return false;
+        }
+        DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
+        String datasetName = dds.getDataset().getDatasetName();
+        if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+                || !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
+                || !datasetName.endsWith("Results")) {
+            return false;
+        }
+        String channelDataverse = dds.getDataset().getDataverseName();
+        //Now we know that we are inserting into results
+
+        String channelName = datasetName.substring(0, datasetName.length() - 7);
+        String subscriptionsName = channelName + "Subscriptions";
+        //TODO: Can we check here to see if there is a channel with such a name?
+
+        DataSourceScanOperator subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
+        if (subscriptionsScan == null) {
+            return false;
+        }
+
+        //Now we want to make sure and set the commit to be a nonsink commit
+        ((CommitOperator) eOp.getDelegate()).setSink(false);
+
+        //Now we need to get the broker EndPoint 
+        LogicalVariable brokerEndpointVar = context.newVar();
+        AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
+        AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
+        //now brokerNameVar holds the brokerName for use farther up in the plan
+
+        context.computeAndSetTypeEnvironmentForOperator(assignOp);
+        context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
+        context.computeAndSetTypeEnvironmentForOperator(eOp);
+
+        //get subscriptionIdVar
+        LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
+
+        //The channelExecutionTime is created just before the scan
+        LogicalVariable channelExecutionVar = ((AssignOperator) subscriptionsScan.getInputs().get(0).getValue())
+                .getVariables().get(0);
+
+        ProjectOperator badProject = (ProjectOperator) findOp(op, "project");
+        badProject.getVariables().add(subscriptionIdVar);
+        badProject.getVariables().add(brokerEndpointVar);
+        badProject.getVariables().add(channelExecutionVar);
+        context.computeAndSetTypeEnvironmentForOperator(badProject);
+
+        //Create my brokerNotify plan above the extension Operator
+        DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar,
+                context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName);
+
+        opRef.setValue(dOp);
+
+        return true;
+    }
+
+    private DelegateOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar,
+            LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext context,
+            ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName)
+                    throws AlgebricksException {
+        //create the Distinct Op
+        ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
+        VariableReferenceExpression vExpr = new VariableReferenceExpression(subscriptionIdVar);
+        expressions.add(new MutableObject<ILogicalExpression>(vExpr));
+        DistinctOperator distinctOp = new DistinctOperator(expressions);
+
+        //create the GroupBy Op
+        //And set the distinct as input
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+        List<ILogicalPlan> nestedPlans = new ArrayList<ILogicalPlan>();
+
+        //create group by operator
+        GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
+        groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
+        groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
+        groupbyOp.getInputs().add(new MutableObject<ILogicalOperator>(distinctOp));
+
+        //create nested plan for subscription ids in group by
+        NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(
+                new MutableObject<ILogicalOperator>(groupbyOp));
+        //TODO: This is from translationcontext. It might be needed to make the variable exist outside of the subplan
+        //LogicalVariable subscriptionListVar = context.newSubplanOutputVar();
+        LogicalVariable subscriptionListVar = context.newVar();
+        List<LogicalVariable> aggVars = new ArrayList<LogicalVariable>();
+        aggVars.add(subscriptionListVar);
+        AggregateFunctionCallExpression funAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(
+                AsterixBuiltinFunctions.LISTIFY, new ArrayList<Mutable<ILogicalExpression>>());
+        funAgg.getArguments()
+                .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(subscriptionIdVar)));
+        List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        aggExpressions.add(new MutableObject<ILogicalExpression>(funAgg));
+        AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions);
+        listifyOp.getInputs().add(new MutableObject<ILogicalOperator>(nestedTupleSourceOp));
+
+        //add nested plans
+        nestedPlans.add(new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(listifyOp)));
+
+        //Create the NotifyBrokerOperator
+        NotifyBrokerOperator notifyBrokerOp = new NotifyBrokerOperator(brokerEndpointVar, subscriptionListVar,
+                channelExecutionVar);
+        EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
+        NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
+        notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
+        DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp);
+        extensionOp.setPhysicalOperator(notifyBrokerPOp);
+        extensionOp.getInputs().add(new MutableObject<ILogicalOperator>(groupbyOp));
+
+        //Set the input for the brokerNotify as the replicate operator
+        distinctOp.getInputs().add(new MutableObject<ILogicalOperator>(eOp));
+
+        //compute environment bottom up
+
+        context.computeAndSetTypeEnvironmentForOperator(distinctOp);
+        context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
+        context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp);
+        context.computeAndSetTypeEnvironmentForOperator(listifyOp);
+        context.computeAndSetTypeEnvironmentForOperator(extensionOp);
+
+        return extensionOp;
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
+            AbstractLogicalOperator opAboveBrokersScan) {
+        Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
+                new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint))));
+        DataSourceScanOperator brokerScan = null;
+        int index = 0;
+        for (Mutable<ILogicalOperator> subOp : opAboveBrokersScan.getInputs()) {
+            if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+                brokerScan = (DataSourceScanOperator) subOp.getValue();
+                break;
+            }
+            index++;
+        }
+        Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
+                new VariableReferenceExpression(brokerScan.getVariables().get(2)));
+
+        ScalarFunctionCallExpression fieldAccessByName = new ScalarFunctionCallExpression(
+                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
+        ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(1);
+        varArray.add(brokerEndpointVar);
+        ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(1);
+        exprArray.add(new MutableObject<ILogicalExpression>(fieldAccessByName));
+
+        AssignOperator assignOp = new AssignOperator(varArray, exprArray);
+
+        //Place assignOp between the scan and the op above it
+        assignOp.getInputs().add(new MutableObject<ILogicalOperator>(brokerScan));
+        opAboveBrokersScan.getInputs().set(index, new MutableObject<ILogicalOperator>(assignOp));
+
+        return assignOp;
+    }
+
+    /*This function searches for the needed op
+     * If lookingForBrokers, find the op above the brokers scan
+     * Else find the suscbriptionsScan
+     */
+    private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
+        if (!op.hasInputs()) {
+            return null;
+        }
+        for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
+            if (lookingForString.equals("brokers")) {
+                if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
+                    return op;
+                } else {
+                    AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+                            lookingForString);
+                    if (nestedOp != null) {
+                        return nestedOp;
+                    }
+                }
+
+            } else if (lookingForString.equals("project")) {
+                if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
+                    return (AbstractLogicalOperator) subOp.getValue();
+                } else {
+                    AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+                            lookingForString);
+                    if (nestedOp != null) {
+                        return nestedOp;
+                    }
+                }
+            }
+
+            else {
+                if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
+                    return (AbstractLogicalOperator) subOp.getValue();
+                } else {
+                    AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
+                            lookingForString);
+                    if (nestedOp != null) {
+                        return nestedOp;
+                    }
+                }
+
+            }
+        }
+        return null;
+    }
+
+    private boolean isBrokerScan(AbstractLogicalOperator op) {
+        if (op instanceof DataSourceScanOperator) {
+            if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
+                DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
+                if (dds.getDataset().getDataverseName().equals("Metadata")
+                        && dds.getDataset().getDatasetName().equals("Broker")) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName) {
+        if (op instanceof DataSourceScanOperator) {
+            if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
+                DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
+                if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
+                        && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
+                    if (dds.getDataset().getDatasetName().equals(subscriptionsName)) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
new file mode 100644
index 0000000..d281b49
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.Collection;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ */
+public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator {
+    private final LogicalVariable subscriptionIdVar;
+    private final LogicalVariable brokerEndpointVar;
+    private final LogicalVariable channelExecutionVar;
+
+    public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable subscriptionIdVar,
+            LogicalVariable resultSetVar) {
+        this.brokerEndpointVar = brokerEndpointVar;
+        this.subscriptionIdVar = subscriptionIdVar;
+        this.channelExecutionVar = resultSetVar;
+    }
+
+    public LogicalVariable getSubscriptionVariable() {
+        return subscriptionIdVar;
+    }
+
+    public LogicalVariable getBrokerEndpointVariable() {
+        return brokerEndpointVar;
+    }
+
+    public LogicalVariable getChannelExecutionVariable() {
+        return channelExecutionVar;
+    }
+
+    @Override
+    public String toString() {
+        return "notify-brokers";
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public IOperatorDelegate newInstance() {
+        return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar);
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public void getUsedVariables(Collection<LogicalVariable> usedVars) {
+        usedVars.add(subscriptionIdVar);
+        usedVars.add(brokerEndpointVar);
+        usedVars.add(channelExecutionVar);
+    }
+
+    @Override
+    public void getProducedVariables(Collection<LogicalVariable> producedVars) {
+        // none produced
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
new file mode 100644
index 0000000..12d5ae2
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class NotifyBrokerPOperator extends AbstractPhysicalOperator {
+
+    private final EntityId entityId;
+
+    public NotifyBrokerPOperator(EntityId entityId) {
+        this.entityId = entityId;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.DELEGATE_OPERATOR;
+    }
+
+    @Override
+    public String toString() {
+        return "NOTIFY_BROKERS";
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+                    throws AlgebricksException {
+        DelegateOperator notify = (DelegateOperator) op;
+        LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable();
+        LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
+        LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
+
+        int brokerColumn = inputSchemas[0].findVariable(brokerVar);
+        int subColumn = inputSchemas[0].findVariable(subVar);
+        int executionColumn = inputSchemas[0].findVariable(executionVar);
+
+        IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn);
+        IScalarEvaluatorFactory subEvalFactory = new ColumnAccessEvalFactory(subColumn);
+        IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn);
+
+        NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, subEvalFactory,
+                channelExecutionEvalFactory, entityId);
+
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
+
+        builder.contributeMicroOperator(op, runtime, recDesc);
+
+        // and contribute one edge from its child
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, notify, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
new file mode 100644
index 0000000..8634e4c
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+    private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+    private final DataInputStream di = new DataInputStream(bbis);
+    private final AOrderedListSerializerDeserializer subSerDes = new AOrderedListSerializerDeserializer(
+            new AOrderedListType(BuiltinType.AUUID, null));
+
+    private IPointable inputArg0 = new VoidPointable();
+    private IPointable inputArg1 = new VoidPointable();
+    private IPointable inputArg2 = new VoidPointable();
+    private IScalarEvaluator eval0;
+    private IScalarEvaluator eval1;
+    private IScalarEvaluator eval2;
+    private final ActiveManager activeManager;
+    private final EntityId entityId;
+
+    public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
+            IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
+            EntityId activeJobId) throws HyracksDataException {
+        this.tRef = new FrameTupleReference();
+        eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
+        eval1 = subEvalFactory.createScalarEvaluator(ctx);
+        eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
+        this.activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getActiveManager();
+        this.entityId = activeJobId;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        return;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            tRef.reset(tAccess, t);
+
+            eval0.evaluate(tRef, inputArg0);
+            eval1.evaluate(tRef, inputArg1);
+            eval2.evaluate(tRef, inputArg2);
+
+            int serBrokerOffset = inputArg0.getStartOffset();
+            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
+            AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di);
+
+            int serSubOffset = inputArg1.getStartOffset();
+            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
+            AOrderedList subs = subSerDes.deserialize(di);
+
+            int resultSetOffset = inputArg2.getStartOffset();
+            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
+            ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
+            String executionTimeString;
+            try {
+                executionTimeString = executionTime.toSimpleString();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+
+            ChannelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs,
+                    executionTimeString);
+
+        }
+
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        return;
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        this.inputRecordDesc = recordDescriptor;
+        this.tAccess = new FrameTupleAccessor(inputRecordDesc);
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        return;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        failed = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
new file mode 100644
index 0000000..d5d05cf
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NotifyBrokerRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IScalarEvaluatorFactory brokerEvalFactory;
+    private final IScalarEvaluatorFactory subEvalFactory;
+    private final IScalarEvaluatorFactory channelExecutionEvalFactory;
+    private final EntityId entityId;
+
+    public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory, IScalarEvaluatorFactory subEvalFactory,
+            IScalarEvaluatorFactory channelExecutionEvalFactory, EntityId entityId) {
+        this.brokerEvalFactory = brokerEvalFactory;
+        this.subEvalFactory = subEvalFactory;
+        this.channelExecutionEvalFactory = channelExecutionEvalFactory;
+        this.entityId = entityId;
+    }
+
+    @Override
+    public String toString() {
+        return "notify-broker";
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+        return new NotifyBrokerRuntime(ctx, brokerEvalFactory, subEvalFactory, channelExecutionEvalFactory, entityId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
new file mode 100644
index 0000000..8093977
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ */
+public class RepetitiveChannelOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorDescriptor.class.getName());
+
+    /** The unique identifier of the job. **/
+    protected final EntityId entityId;
+
+    protected final JobSpecification jobSpec;
+
+    private final String duration;
+
+    private String strIP;
+    private int port;
+
+    public RepetitiveChannelOperatorDescriptor(JobSpecification spec, String dataverseName, String channelName,
+            String duration, JobSpecification channeljobSpec, String strIP, int port) {
+        super(spec, 0, 0);
+        this.entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
+        this.jobSpec = channeljobSpec;
+        this.duration = duration;
+        this.strIP = strIP;
+        this.port = port;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        ActiveRuntimeId runtimeId = new ActiveRuntimeId(entityId,
+                RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition);
+        return new RepetitiveChannelOperatorNodePushable(ctx, runtimeId, jobSpec, duration, strIP, port);
+    }
+
+    public String getDuration() {
+        return duration;
+    }
+
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    public JobSpecification getJobSpec() {
+        return jobSpec;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
new file mode 100644
index 0000000..1bbe331
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.EnumSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class RepetitiveChannelOperatorNodePushable extends ActiveSourceOperatorNodePushable {
+
+    private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorNodePushable.class.getName());
+
+    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+    private final JobSpecification jobSpec;
+    private long duration;
+    private final HyracksConnection hcc;
+
+    public RepetitiveChannelOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId,
+            JobSpecification channeljobSpec, String duration, String strIP, int port) throws HyracksDataException {
+        super(ctx, runtimeId);
+        this.jobSpec = channeljobSpec;
+        this.duration = ChannelJobService.findPeriod(duration);
+        try {
+            hcc = new HyracksConnection(strIP, port);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+
+    @Override
+    protected void start() throws HyracksDataException, InterruptedException {
+        try {
+            scheduledExecutorService =
+                    ChannelJobService.startJob(jobSpec, EnumSet.noneOf(JobFlag.class), null, hcc, duration);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        while (!scheduledExecutorService.isTerminated()) {
+
+        }
+
+    }
+
+    @Override
+    protected void abort() throws HyracksDataException, InterruptedException {
+        scheduledExecutorService.shutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/resources/lang-extension/lang.txt
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
new file mode 100644
index 0000000..94b4c78
--- /dev/null
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -0,0 +1,206 @@
+import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelSubscribeStatement;
+import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
+import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
+import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
+import org.apache.asterix.bad.lang.statement.CreateProcedureStatement;
+
+
+@merge
+Statement SingleStatement() throws ParseException:
+{
+  // merge area 1
+  before:
+  after:
+}
+{
+  (
+    // merge area 2
+    before:
+    after:    | stmt = ChannelSubscriptionStatement())
+  {
+    // merge area 3
+  }
+}
+
+@merge
+Statement CreateStatement() throws ParseException:
+{
+  // merge area 1
+  before:
+  after:
+}
+{
+  (
+    // merge area 2
+    before:
+    after:    | stmt = ChannelSpecification() | stmt = BrokerSpecification() | stmt = ProcedureSpecification())
+  {
+    // merge area 3
+  }
+}
+
+@merge
+Statement DropStatement() throws ParseException:
+{
+  // merge area 1
+  before:
+  after:
+}
+{
+  (
+    // merge area 2
+    before:
+    after:    | "channel" pairId = QualifiedName() ifExists = IfExists()
+      {
+        stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists);
+      }
+              | "broker" pairId = QualifiedName() ifExists = IfExists()
+      {
+        stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);	
+      }
+      )
+  {
+    // merge area 3
+  }
+}
+
+@new
+CreateChannelStatement ChannelSpecification() throws ParseException:
+{
+  Pair<Identifier,Identifier> nameComponents = null;
+  FunctionSignature appliedFunction = null;
+  CreateChannelStatement ccs = null;
+  String fqFunctionName = null;
+  Expression period = null;
+  boolean distributed = false;
+}
+{
+  (
+    "repetitive" "channel"  nameComponents = QualifiedName()
+    <USING> appliedFunction = FunctionSignature()
+    "period" period = FunctionCallExpr() ("distributed" { distributed = true; })?
+    {
+      ccs = new CreateChannelStatement(nameComponents.first,
+                                   nameComponents.second, appliedFunction, period, distributed);
+    }
+  )
+    {
+      return ccs;
+    }
+}
+
+
+@new
+CreateProcedureStatement ProcedureSpecification() throws ParseException:
+{
+  Pair<Identifier,Identifier> nameComponents = null;
+  FunctionSignature signature;
+  List<VarIdentifier> paramList = new ArrayList<VarIdentifier>();
+  String functionBody;
+  Token beginPos;
+  Token endPos;
+  Expression functionBodyExpr;
+}
+{
+    "procedure" nameComponents = QualifiedName()
+     paramList = ParameterList()
+    <LEFTBRACE>
+  {
+     beginPos = token;
+  }
+  functionBodyExpr = Expression() <RIGHTBRACE>
+    {
+      endPos = token;
+      functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn);
+      signature = new FunctionSignature(nameComponents.first.toString(), nameComponents.second.toString(), paramList.size());
+      removeCurrentScope();
+      return new CreateProcedureStatement(signature, paramList, functionBody);
+    }
+}
+
+
+
+
+@new
+CreateBrokerStatement BrokerSpecification() throws ParseException:
+{
+  CreateBrokerStatement cbs = null;
+  Pair<Identifier,Identifier> name = null;
+  String endPoint = null;
+}
+{
+  (
+    "broker"  name = QualifiedName()
+    <AT>  endPoint = StringLiteral()
+    {
+      cbs = new CreateBrokerStatement(name.first, name.second,endPoint);
+    }
+  )
+    {
+      return cbs;
+    }
+}
+
+@new
+Statement ChannelSubscriptionStatement() throws ParseException:
+{
+  Statement stmt = null;
+  Pair<Identifier,Identifier> nameComponents = null;
+  List<Expression> argList = new ArrayList<Expression>();
+  Expression tmp = null;
+  String id = null;
+  String subscriptionId = null;
+  Pair<Identifier,Identifier> brokerName = null;
+}
+{
+  (
+  "subscribe" <TO> nameComponents = QualifiedName()
+   <LEFTPAREN> (tmp = Expression()
+   {
+      argList.add(tmp);
+   }
+   (<COMMA> tmp = Expression()
+   {
+      argList.add(tmp);
+   }
+   )*)? <RIGHTPAREN> <ON> brokerName = QualifiedName()
+   {
+      stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+   }
+   | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
+      {
+        setDataverses(new ArrayList<String>());
+        setDatasets(new ArrayList<String>());
+        VariableExpr varExp = new VariableExpr();
+        VarIdentifier var = new VarIdentifier();
+        varExp.setVar(var);
+        var.setValue("$subscriptionPlaceholder");
+        getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
+        List<String> dataverses = getDataverses();
+        List<String> datasets = getDatasets();
+        // we remove the pointer to the dataverses and datasets
+        setDataverses(null);
+        setDatasets(null);
+        stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter(), dataverses, datasets);
+      }
+     | "change" "subscription" subscriptionId = StringLiteral()  <ON> nameComponents = QualifiedName()
+       <LEFTPAREN> (tmp = Expression()
+       {
+         argList.add(tmp);
+       }
+       (<COMMA> tmp = Expression()
+       {
+         argList.add(tmp);
+       }
+       )*)? <RIGHTPAREN>
+        <TO> brokerName = QualifiedName()
+      {
+        stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+      }
+    )
+    {
+      return stmt;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
new file mode 100644
index 0000000..77e8afe
--- /dev/null
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.TestGroup;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the runtime test cases under 'src/test/resources/runtimets'.
+ */
+@RunWith(Parameterized.class)
+public class BADExecutionTest {
+
+    protected static final Logger LOGGER = Logger.getLogger(BADExecutionTest.class.getName());
+
+    protected static final String PATH_ACTUAL = "target/rttest" + File.separator;
+    protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
+            File.separator);
+
+    protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+
+    protected static AsterixTransactionProperties txnProperties;
+    private static final TestExecutor testExecutor = new TestExecutor();
+    private static final boolean cleanupOnStart = true;
+    private static final boolean cleanupOnStop = true;
+
+    protected static TestGroup FailedGroup;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        File outdir = new File(PATH_ACTUAL);
+        outdir.mkdirs();
+        ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        ExecutionTestUtil.tearDown(cleanupOnStop);
+        ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
+    }
+
+    @Parameters(name = "BADExecutionTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return buildTestsInXml("testsuite.xml");
+    }
+
+    protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+        Collection<Object[]> testArgs = new ArrayList<Object[]>();
+        TestCaseContext.Builder b = new TestCaseContext.Builder();
+        for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+            testArgs.add(new Object[] { ctx });
+        }
+        return testArgs;
+
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public BADExecutionTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, FailedGroup);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
new file mode 100644
index 0000000..ad2f1bf
--- /dev/null
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import java.io.File;
+import java.util.logging.Logger;
+
+import org.apache.asterix.bad.lang.BADCompilationProvider;
+import org.apache.asterix.bad.lang.BADQueryTranslatorFactory;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.IdentitiyResolverFactory;
+import org.apache.asterix.test.optimizer.OptimizerTest;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class BADOptimizerTest extends OptimizerTest {
+
+    private static final Logger LOGGER = Logger.getLogger(BADOptimizerTest.class.getName());
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+        final File outdir = new File(PATH_ACTUAL);
+        outdir.mkdirs();
+
+        extensionLangCompilationProvider = new BADCompilationProvider();
+        statementExecutorFactory = new BADQueryTranslatorFactory();
+
+        integrationUtil.init(true);
+        // Set the node resolver to be the identity resolver that expects node names
+        // to be node controller ids; a valid assumption in test environment.
+        System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
+                IdentitiyResolverFactory.class.getName());
+    }
+
+    public BADOptimizerTest(File queryFile, File expectedFile, File actualFile) {
+        super(queryFile, expectedFile, actualFile);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml b/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
new file mode 100644
index 0000000..c2f5d41
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
@@ -0,0 +1,110 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+  <metadataNode>asterix_nc1</metadataNode>
+  <store>
+    <ncId>asterix_nc1</ncId>
+    <storeDirs>iodevice0,iodevice1</storeDirs>
+  </store>
+  <store>
+    <ncId>asterix_nc2</ncId>
+    <storeDirs>iodevice0,iodevice1</storeDirs>
+  </store>
+  <transactionLogDir>
+    <ncId>asterix_nc1</ncId>
+    <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
+  </transactionLogDir>
+  <transactionLogDir>
+    <ncId>asterix_nc2</ncId>
+    <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
+  </transactionLogDir>
+  <extensions>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.lang.BADQueryTranslatorExtension</extensionClassName>
+    </extension>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.lang.BADLangExtension</extensionClassName>
+    </extension>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.metadata.BADMetadataExtension</extensionClassName>
+    </extension>
+  </extensions>
+  <property>
+    <name>max.wait.active.cluster</name>
+    <value>60</value>
+    <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
+      nodes are available)
+      before a submitted query/statement can be
+      executed. (Default = 60 seconds)
+    </description>
+  </property>
+  <property>
+    <name>log.level</name>
+    <value>WARNING</value>
+    <description>Log level for running tests/build</description>
+  </property>
+  <property>
+    <name>compiler.framesize</name>
+    <value>32768</value>
+  </property>
+  <property>
+    <name>compiler.sortmemory</name>
+    <value>327680</value>
+  </property>
+  <property>
+    <name>compiler.groupmemory</name>
+    <value>163840</value>
+  </property>
+  <property>
+    <name>compiler.joinmemory</name>
+    <value>163840</value>
+  </property>
+  <property>
+    <name>compiler.pregelix.home</name>
+    <value>~/pregelix</value>
+  </property>
+  <property>
+    <name>storage.buffercache.pagesize</name>
+    <value>32768</value>
+    <description>The page size in bytes for pages in the buffer cache.
+      (Default = "32768" // 32KB)
+    </description>
+  </property>
+  <property>
+    <name>storage.buffercache.size</name>
+    <value>33554432</value>
+    <description>The size of memory allocated to the disk buffer cache.
+      The value should be a multiple of the buffer cache page size(Default
+      = "33554432" // 32MB)
+    </description>
+  </property>
+  <property>
+    <name>storage.memorycomponent.numpages</name>
+    <value>8</value>
+    <description>The number of pages to allocate for a memory component.
+      (Default = 8)
+    </description>
+  </property>
+  <property>
+    <name>plot.activate</name>
+    <value>false</value>
+    <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+    </description>
+  </property>
+</asterixConfiguration>

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/cluster.xml
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/conf/cluster.xml b/asterix-bad/src/test/resources/conf/cluster.xml
new file mode 100644
index 0000000..8f0b694
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/cluster.xml
@@ -0,0 +1,49 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+  <instance_name>asterix</instance_name>
+  <store>storage</store>
+
+  <data_replication>
+    <enabled>false</enabled>
+    <replication_port>2016</replication_port>
+    <replication_factor>2</replication_factor>
+    <auto_failover>false</auto_failover>
+    <replication_time_out>30</replication_time_out>
+  </data_replication>
+
+  <master_node>
+    <id>master</id>
+    <client_ip>127.0.0.1</client_ip>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <client_port>1098</client_port>
+    <cluster_port>1099</cluster_port>
+    <http_port>8888</http_port>
+  </master_node>
+  <node>
+    <id>nc1</id>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <replication_port>2016</replication_port>
+  </node>
+  <node>
+    <id>nc2</id>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <replication_port>2017</replication_port>
+  </node>
+</cluster>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/hyracks-deployment.properties
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/conf/hyracks-deployment.properties b/asterix-bad/src/test/resources/conf/hyracks-deployment.properties
new file mode 100644
index 0000000..17a6772
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/hyracks-deployment.properties
@@ -0,0 +1,21 @@
+#/*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+cc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.CCBootstrapImpl
+nc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.NCBootstrapImpl
+cc.ip=127.0.0.1
+cc.port=1098

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/test.properties
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/conf/test.properties b/asterix-bad/src/test/resources/conf/test.properties
new file mode 100644
index 0000000..86269c8
--- /dev/null
+++ b/asterix-bad/src/test/resources/conf/test.properties
@@ -0,0 +1,22 @@
+#/*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+MetadataNode=nc1
+NewUniverse=true
+nc1.stores=nc1data
+nc2.stores=nc2data
+OutputDir=/tmp/asterix_output/

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql
new file mode 100644
index 0000000..4dc9291
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql
@@ -0,0 +1,36 @@
+/*
+ * Description  : Check the Plan used by a channel
+ * Expected Res : Success
+ * Date         : Mar 2015
+ */
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text 
+};
+
+write output to nc1:"rttest/channel-create.adm";
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file