You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2016/09/09 20:54:15 UTC

[13/15] asterixdb-bad git commit: Fixed structure

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
----------------------------------------------------------------------
diff --git a/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
new file mode 100644
index 0000000..c680988
--- /dev/null
+++ b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.Collection;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ */
+public class NotifyBrokerOperator extends AbstractExtensibleLogicalOperator {
+    private final LogicalVariable subscriptionIdVar;
+    private final LogicalVariable brokerEndpointVar;
+    private final LogicalVariable channelExecutionVar;
+
+    public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable subscriptionIdVar,
+            LogicalVariable resultSetVar) {
+        this.brokerEndpointVar = brokerEndpointVar;
+        this.subscriptionIdVar = subscriptionIdVar;
+        this.channelExecutionVar = resultSetVar;
+    }
+
+    public LogicalVariable getSubscriptionVariable() {
+        return subscriptionIdVar;
+    }
+
+    public LogicalVariable getBrokerEndpointVariable() {
+        return brokerEndpointVar;
+    }
+
+    public LogicalVariable getChannelExecutionVariable() {
+        return channelExecutionVar;
+    }
+
+    @Override
+    public String toString() {
+        return "notify-brokers";
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public IOperatorExtension newInstance() {
+        return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar);
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public void getUsedVariables(Collection<LogicalVariable> usedVars) {
+        usedVars.add(subscriptionIdVar);
+        usedVars.add(brokerEndpointVar);
+        usedVars.add(channelExecutionVar);
+    }
+
+    @Override
+    public void getProducedVariables(Collection<LogicalVariable> producedVars) {
+        // none produced
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
new file mode 100644
index 0000000..753ece7
--- /dev/null
+++ b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class NotifyBrokerPOperator extends AbstractPhysicalOperator {
+
+    private final EntityId entityId;
+
+    public NotifyBrokerPOperator(EntityId entityId) {
+        this.entityId = entityId;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.EXTENSION_OPERATOR;
+    }
+
+    @Override
+    public String toString() {
+        return "NOTIFY_BROKERS";
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+                    throws AlgebricksException {
+        ExtensionOperator notify = (ExtensionOperator) op;
+        LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable();
+        LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
+        LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
+
+        int brokerColumn = inputSchemas[0].findVariable(brokerVar);
+        int subColumn = inputSchemas[0].findVariable(subVar);
+        int executionColumn = inputSchemas[0].findVariable(executionVar);
+
+        IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn);
+        IScalarEvaluatorFactory subEvalFactory = new ColumnAccessEvalFactory(subColumn);
+        IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn);
+
+        NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, subEvalFactory,
+                channelExecutionEvalFactory, entityId);
+
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
+
+        builder.contributeMicroOperator(op, runtime, recDesc);
+
+        // and contribute one edge from its child
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, notify, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
new file mode 100644
index 0000000..d55080c
--- /dev/null
+++ b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+
+public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+
+    private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+    private final DataInputStream di = new DataInputStream(bbis);
+    private final AOrderedListSerializerDeserializer subSerDes = new AOrderedListSerializerDeserializer(
+            new AOrderedListType(BuiltinType.AUUID, null));
+
+    private IPointable inputArg0 = new VoidPointable();
+    private IPointable inputArg1 = new VoidPointable();
+    private IPointable inputArg2 = new VoidPointable();
+    private IScalarEvaluator eval0;
+    private IScalarEvaluator eval1;
+    private IScalarEvaluator eval2;
+    private final ActiveManager activeManager;
+    private final EntityId entityId;
+    private ChannelJobService channelJobService;
+
+    public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
+            IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
+            EntityId activeJobId) throws AlgebricksException {
+        this.tRef = new FrameTupleReference();
+        eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
+        eval1 = subEvalFactory.createScalarEvaluator(ctx);
+        eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
+        this.activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getActiveManager();
+        this.entityId = activeJobId;
+        channelJobService = new ChannelJobService();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        return;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            tRef.reset(tAccess, t);
+            try {
+                eval0.evaluate(tRef, inputArg0);
+                eval1.evaluate(tRef, inputArg1);
+                eval2.evaluate(tRef, inputArg2);
+            } catch (AlgebricksException e) {
+                throw new HyracksDataException(e);
+            }
+            int serBrokerOffset = inputArg0.getStartOffset();
+            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
+            AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di);
+
+            int serSubOffset = inputArg1.getStartOffset();
+            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
+            AOrderedList subs = subSerDes.deserialize(di);
+
+            int resultSetOffset = inputArg2.getStartOffset();
+            bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
+            ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
+            String executionTimeString = executionTime.toSimpleString();
+
+            channelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs,
+                    executionTimeString);
+
+        }
+
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        return;
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        this.inputRecordDesc = recordDescriptor;
+        this.tAccess = new FrameTupleAccessor(inputRecordDesc);
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        return;
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        failed = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
new file mode 100644
index 0000000..d5452d4
--- /dev/null
+++ b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.bad.runtime;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class NotifyBrokerRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IScalarEvaluatorFactory brokerEvalFactory;
+    private final IScalarEvaluatorFactory subEvalFactory;
+    private final IScalarEvaluatorFactory channelExecutionEvalFactory;
+    private final EntityId entityId;
+
+    public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory, IScalarEvaluatorFactory subEvalFactory,
+            IScalarEvaluatorFactory channelExecutionEvalFactory, EntityId entityId) {
+        this.brokerEvalFactory = brokerEvalFactory;
+        this.subEvalFactory = subEvalFactory;
+        this.channelExecutionEvalFactory = channelExecutionEvalFactory;
+        this.entityId = entityId;
+    }
+
+    @Override
+    public String toString() {
+        return "notify-broker";
+    }
+
+    @Override
+    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
+        return new NotifyBrokerRuntime(ctx, brokerEvalFactory, subEvalFactory, channelExecutionEvalFactory, entityId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
new file mode 100644
index 0000000..6521ecb
--- /dev/null
+++ b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * A repetitive channel operator, which uses a Java timer to run a given query periodically
+ */
+public class RepetitiveChannelOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorDescriptor.class.getName());
+
+    /** The unique identifier of the job. **/
+    protected final EntityId entityId;
+
+    protected final JobSpecification jobSpec;
+
+    private final String duration;
+
+    private String strIP;
+    private int port;
+
+    public RepetitiveChannelOperatorDescriptor(JobSpecification spec, String dataverseName, String channelName,
+            String duration, JobSpecification channeljobSpec, String strIP, int port) {
+        super(spec, 0, 0);
+        this.entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
+        this.jobSpec = channeljobSpec;
+        this.duration = duration;
+        this.strIP = strIP;
+        this.port = port;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        ActiveRuntimeId runtimeId = new ActiveRuntimeId(entityId,
+                RepetitiveChannelOperatorDescriptor.class.getSimpleName(), partition);
+        try {
+            return new RepetitiveChannelOperatorNodePushable(ctx, runtimeId, jobSpec, duration, strIP, port);
+        } catch (AsterixException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public String getDuration() {
+        return duration;
+    }
+
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    public JobSpecification getJobSpec() {
+        return jobSpec;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
new file mode 100644
index 0000000..873d2e7
--- /dev/null
+++ b/asterix-opt/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.runtime;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class RepetitiveChannelOperatorNodePushable extends ActiveSourceOperatorNodePushable {
+
+    private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorNodePushable.class.getName());
+
+    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+    private final JobSpecification jobSpec;
+    private long duration;
+    private ChannelJobService channelJobService;
+    private String strIP;
+    private int port;
+
+    public RepetitiveChannelOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId,
+            JobSpecification channeljobSpec, String duration, String strIP, int port) throws AsterixException {
+        super(ctx, runtimeId);
+        this.jobSpec = channeljobSpec;
+        this.duration = findPeriod(duration);
+        //TODO: we should share channelJobService as a single instance
+        //And only create one hcc
+        channelJobService = new ChannelJobService();
+        this.strIP = strIP;
+        this.port = port;
+    }
+
+    public void executeJob() throws Exception {
+        LOGGER.info("Executing Job: " + runtimeId.toString());
+        channelJobService.runChannelJob(jobSpec, strIP, port);
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+            throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void start() throws HyracksDataException, InterruptedException {
+        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    executeJob();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, duration, duration, TimeUnit.MILLISECONDS);
+
+        while (!scheduledExecutorService.isTerminated()) {
+
+        }
+
+    }
+
+    @Override
+    protected void abort() throws HyracksDataException, InterruptedException {
+        scheduledExecutorService.shutdown();
+    }
+
+    private long findPeriod(String duration) {
+        //TODO: Allow Repetitive Channels to use YMD durations  
+        String hoursMinutesSeconds = "";
+        if (duration.indexOf('T') != -1) {
+            hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
+        }
+        double seconds = 0;
+        if (hoursMinutesSeconds != "") {
+            int pos = 0;
+            if (hoursMinutesSeconds.indexOf('H') != -1) {
+                Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
+                seconds += (hours * 60 * 60);
+                pos = hoursMinutesSeconds.indexOf('H') + 1;
+
+            }
+            if (hoursMinutesSeconds.indexOf('M') != -1) {
+                Double minutes = Double
+                        .parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
+                seconds += (minutes * 60);
+                pos = hoursMinutesSeconds.indexOf('M') + 1;
+            }
+            if (hoursMinutesSeconds.indexOf('S') != -1) {
+                Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
+                seconds += (s);
+            }
+
+        }
+        return (long) (seconds * 1000);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/main/resources/lang-extension/lang.txt
----------------------------------------------------------------------
diff --git a/asterix-opt/src/main/resources/lang-extension/lang.txt b/asterix-opt/src/main/resources/lang-extension/lang.txt
new file mode 100644
index 0000000..90edb91
--- /dev/null
+++ b/asterix-opt/src/main/resources/lang-extension/lang.txt
@@ -0,0 +1,179 @@
+import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
+import org.apache.asterix.bad.lang.statement.ChannelSubscribeStatement;
+import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
+import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
+import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
+
+
+@merge
+Statement SingleStatement() throws ParseException:
+{
+  // merge area 1
+  before:
+  after:
+}
+{
+  (
+    // merge area 2
+    before:
+    after:    | stmt = ChannelSubscriptionStatement())
+  {
+    // merge area 3
+  }
+}
+
+@merge
+Statement CreateStatement() throws ParseException:
+{
+  // merge area 1
+  before:
+  after:
+}
+{
+  (
+    // merge area 2
+    before:
+    after:    | stmt = ChannelSpecification() | stmt = BrokerSpecification())
+  {
+    // merge area 3
+  }
+}
+
+@merge
+Statement DropStatement() throws ParseException:
+{
+  // merge area 1
+  before:
+  after:
+}
+{
+  (
+    // merge area 2
+    before:
+    after:    | <CHANNEL> pairId = QualifiedName() ifExists = IfExists()
+      {
+        stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists);
+      }
+      	      | <BROKER> pairId = QualifiedName() ifExists = IfExists()	
+      {	
+        stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);	
+      }
+      )
+  {
+    // merge area 3
+  }
+}
+
+@new
+CreateChannelStatement ChannelSpecification() throws ParseException:
+{
+  Pair<Identifier,Identifier> nameComponents = null;
+  FunctionSignature appliedFunction = null;
+  CreateChannelStatement ccs = null;
+  String fqFunctionName = null;
+  Expression period = null;
+}
+{
+  (
+    "repetitive" <CHANNEL>  nameComponents = QualifiedName()
+    <USING> appliedFunction = FunctionSignature()
+    "period" period = FunctionCallExpr()
+    {
+      ccs = new CreateChannelStatement(nameComponents.first,
+                                   nameComponents.second, appliedFunction, period);
+    }
+  )
+    {
+      return ccs;
+    }
+}
+
+@new
+CreateBrokerStatement BrokerSpecification() throws ParseException:
+{
+  CreateBrokerStatement cbs = null;
+  Pair<Identifier,Identifier> name = null;
+  String endPoint = null;
+}
+{
+  (
+    <BROKER>  name = QualifiedName()
+    <AT>  endPoint = StringLiteral()
+    {
+      cbs = new CreateBrokerStatement(name.first, name.second,endPoint);
+    }
+  )
+    {
+      return cbs;
+    }
+}
+
+@new
+Statement ChannelSubscriptionStatement() throws ParseException:
+{
+  Statement stmt = null;
+  Pair<Identifier,Identifier> nameComponents = null;
+  List<Expression> argList = new ArrayList<Expression>();
+  Expression tmp = null;
+  String id = null;
+  String subscriptionId = null;
+  Pair<Identifier,Identifier> brokerName = null;
+}
+{
+  (
+  "subscribe" <TO> nameComponents = QualifiedName()
+   <LEFTPAREN> (tmp = Expression()
+   {
+      argList.add(tmp);
+   }
+   (<COMMA> tmp = Expression()
+   {
+      argList.add(tmp);
+   }
+   )*)? <RIGHTPAREN> <ON> brokerName = QualifiedName()
+   {
+      stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+   }
+   | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
+      {
+        setDataverses(new ArrayList<String>());
+        setDatasets(new ArrayList<String>());
+        VariableExpr varExp = new VariableExpr();
+        VarIdentifier var = new VarIdentifier();
+        varExp.setVar(var);
+        var.setValue("$subscriptionPlaceholder");
+        getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
+        List<String> dataverses = getDataverses();
+        List<String> datasets = getDatasets();
+        // we remove the pointer to the dataverses and datasets
+        setDataverses(null);
+        setDatasets(null);
+        stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter(), dataverses, datasets);
+      }
+     | "change" "subscription" subscriptionId = StringLiteral()  <ON> nameComponents = QualifiedName()
+       <LEFTPAREN> (tmp = Expression()
+       {
+         argList.add(tmp);
+       }
+       (<COMMA> tmp = Expression()
+       {
+         argList.add(tmp);
+       }
+       )*)? <RIGHTPAREN>
+        <TO> brokerName = QualifiedName()
+      {
+        stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
+      }
+    )
+    {
+      return stmt;
+    }
+}
+
+<DEFAULT,IN_DBL_BRACE>
+TOKEN [IGNORE_CASE]:
+{
+    <BROKER : "broker">
+  | <CHANNEL : "channel">
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/asterix-opt/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
new file mode 100644
index 0000000..77e8afe
--- /dev/null
+++ b/asterix-opt/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.TestGroup;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the runtime test cases under 'src/test/resources/runtimets'.
+ */
+@RunWith(Parameterized.class)
+public class BADExecutionTest {
+
+    protected static final Logger LOGGER = Logger.getLogger(BADExecutionTest.class.getName());
+
+    protected static final String PATH_ACTUAL = "target/rttest" + File.separator;
+    protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
+            File.separator);
+
+    protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+
+    protected static AsterixTransactionProperties txnProperties;
+    private static final TestExecutor testExecutor = new TestExecutor();
+    private static final boolean cleanupOnStart = true;
+    private static final boolean cleanupOnStop = true;
+
+    protected static TestGroup FailedGroup;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        File outdir = new File(PATH_ACTUAL);
+        outdir.mkdirs();
+        ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        ExecutionTestUtil.tearDown(cleanupOnStop);
+        ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
+    }
+
+    @Parameters(name = "BADExecutionTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return buildTestsInXml("testsuite.xml");
+    }
+
+    protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+        Collection<Object[]> testArgs = new ArrayList<Object[]>();
+        TestCaseContext.Builder b = new TestCaseContext.Builder();
+        for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+            testArgs.add(new Object[] { ctx });
+        }
+        return testArgs;
+
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public BADExecutionTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, FailedGroup);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java b/asterix-opt/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
new file mode 100644
index 0000000..040a4e9
--- /dev/null
+++ b/asterix-opt/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import java.io.File;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.IdentitiyResolverFactory;
+import org.apache.asterix.test.optimizer.OptimizerTest;
+import org.apache.asterix.test.runtime.HDFSCluster;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class BADOptimizerTest extends OptimizerTest {
+
+    private static final Logger LOGGER = Logger.getLogger(BADOptimizerTest.class.getName());
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+        final File outdir = new File(PATH_ACTUAL);
+        outdir.mkdirs();
+
+        HDFSCluster.getInstance().setup();
+
+        integrationUtil.init(true);
+        // Set the node resolver to be the identity resolver that expects node names
+        // to be node controller ids; a valid assumption in test environment.
+        System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
+                IdentitiyResolverFactory.class.getName());
+    }
+
+    public BADOptimizerTest(File queryFile, File expectedFile, File actualFile) {
+        super(queryFile, expectedFile, actualFile);
+        TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/conf/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/conf/asterix-build-configuration.xml b/asterix-opt/src/test/resources/conf/asterix-build-configuration.xml
new file mode 100644
index 0000000..c2f5d41
--- /dev/null
+++ b/asterix-opt/src/test/resources/conf/asterix-build-configuration.xml
@@ -0,0 +1,110 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+  <metadataNode>asterix_nc1</metadataNode>
+  <store>
+    <ncId>asterix_nc1</ncId>
+    <storeDirs>iodevice0,iodevice1</storeDirs>
+  </store>
+  <store>
+    <ncId>asterix_nc2</ncId>
+    <storeDirs>iodevice0,iodevice1</storeDirs>
+  </store>
+  <transactionLogDir>
+    <ncId>asterix_nc1</ncId>
+    <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
+  </transactionLogDir>
+  <transactionLogDir>
+    <ncId>asterix_nc2</ncId>
+    <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
+  </transactionLogDir>
+  <extensions>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.lang.BADQueryTranslatorExtension</extensionClassName>
+    </extension>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.lang.BADLangExtension</extensionClassName>
+    </extension>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.metadata.BADMetadataExtension</extensionClassName>
+    </extension>
+  </extensions>
+  <property>
+    <name>max.wait.active.cluster</name>
+    <value>60</value>
+    <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
+      nodes are available)
+      before a submitted query/statement can be
+      executed. (Default = 60 seconds)
+    </description>
+  </property>
+  <property>
+    <name>log.level</name>
+    <value>WARNING</value>
+    <description>Log level for running tests/build</description>
+  </property>
+  <property>
+    <name>compiler.framesize</name>
+    <value>32768</value>
+  </property>
+  <property>
+    <name>compiler.sortmemory</name>
+    <value>327680</value>
+  </property>
+  <property>
+    <name>compiler.groupmemory</name>
+    <value>163840</value>
+  </property>
+  <property>
+    <name>compiler.joinmemory</name>
+    <value>163840</value>
+  </property>
+  <property>
+    <name>compiler.pregelix.home</name>
+    <value>~/pregelix</value>
+  </property>
+  <property>
+    <name>storage.buffercache.pagesize</name>
+    <value>32768</value>
+    <description>The page size in bytes for pages in the buffer cache.
+      (Default = "32768" // 32KB)
+    </description>
+  </property>
+  <property>
+    <name>storage.buffercache.size</name>
+    <value>33554432</value>
+    <description>The size of memory allocated to the disk buffer cache.
+      The value should be a multiple of the buffer cache page size(Default
+      = "33554432" // 32MB)
+    </description>
+  </property>
+  <property>
+    <name>storage.memorycomponent.numpages</name>
+    <value>8</value>
+    <description>The number of pages to allocate for a memory component.
+      (Default = 8)
+    </description>
+  </property>
+  <property>
+    <name>plot.activate</name>
+    <value>false</value>
+    <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+    </description>
+  </property>
+</asterixConfiguration>

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/conf/cluster.xml
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/conf/cluster.xml b/asterix-opt/src/test/resources/conf/cluster.xml
new file mode 100644
index 0000000..8f0b694
--- /dev/null
+++ b/asterix-opt/src/test/resources/conf/cluster.xml
@@ -0,0 +1,49 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+  <instance_name>asterix</instance_name>
+  <store>storage</store>
+
+  <data_replication>
+    <enabled>false</enabled>
+    <replication_port>2016</replication_port>
+    <replication_factor>2</replication_factor>
+    <auto_failover>false</auto_failover>
+    <replication_time_out>30</replication_time_out>
+  </data_replication>
+
+  <master_node>
+    <id>master</id>
+    <client_ip>127.0.0.1</client_ip>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <client_port>1098</client_port>
+    <cluster_port>1099</cluster_port>
+    <http_port>8888</http_port>
+  </master_node>
+  <node>
+    <id>nc1</id>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <replication_port>2016</replication_port>
+  </node>
+  <node>
+    <id>nc2</id>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <replication_port>2017</replication_port>
+  </node>
+</cluster>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/conf/hyracks-deployment.properties
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/conf/hyracks-deployment.properties b/asterix-opt/src/test/resources/conf/hyracks-deployment.properties
new file mode 100644
index 0000000..17a6772
--- /dev/null
+++ b/asterix-opt/src/test/resources/conf/hyracks-deployment.properties
@@ -0,0 +1,21 @@
+#/*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+cc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.CCBootstrapImpl
+nc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.NCBootstrapImpl
+cc.ip=127.0.0.1
+cc.port=1098

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/conf/test.properties
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/conf/test.properties b/asterix-opt/src/test/resources/conf/test.properties
new file mode 100644
index 0000000..86269c8
--- /dev/null
+++ b/asterix-opt/src/test/resources/conf/test.properties
@@ -0,0 +1,22 @@
+#/*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+MetadataNode=nc1
+NewUniverse=true
+nc1.stores=nc1data
+nc2.stores=nc2data
+OutputDir=/tmp/asterix_output/

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-create.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-create.aql b/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-create.aql
new file mode 100644
index 0000000..4dc9291
--- /dev/null
+++ b/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-create.aql
@@ -0,0 +1,36 @@
+/*
+ * Description  : Check the Plan used by a channel
+ * Expected Res : Success
+ * Date         : Mar 2015
+ */
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text 
+};
+
+write output to nc1:"rttest/channel-create.adm";
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql b/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
new file mode 100644
index 0000000..682bd6d
--- /dev/null
+++ b/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-subscribe.aql
@@ -0,0 +1,40 @@
+/*
+ * Description  : Check the Plan for Subscribing to a channel    
+ * Expected Res : Success
+ * Date         : Mar 2015
+ */
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text 
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
+
+write output to nc1:"rttest/channel-subscribe.adm";
+
+create broker brokerA at "http://www.hello.com";
+
+subscribe to nearbyTweetChannel (point("30.0, 30.0"), "Live") on brokerA;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql b/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
new file mode 100644
index 0000000..7cdec50
--- /dev/null
+++ b/asterix-opt/src/test/resources/optimizerts/queries/channel/channel-unsubscribe.aql
@@ -0,0 +1,38 @@
+/*
+ * Description  : Check the Plan for Unsubscribing to a channel
+ * Expected Res : Success
+ * Date         : Mar 2015
+ */
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text 
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
+
+write output to nc1:"rttest/channel-unsubscribe.adm";
+
+unsubscribe "c45ef6d0-c5ae-4b9e-b5da-cf1932718296" from nearbyTweetChannel;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/optimizerts/results/channel/channel-create.plan
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/optimizerts/results/channel/channel-create.plan b/asterix-opt/src/test/resources/optimizerts/results/channel/channel-create.plan
new file mode 100644
index 0000000..f597191
--- /dev/null
+++ b/asterix-opt/src/test/resources/optimizerts/results/channel/channel-create.plan
@@ -0,0 +1,30 @@
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INSERT_DELETE  |PARTITIONED|
+        -- HASH_PARTITION_EXCHANGE [$$22]  |PARTITIONED|
+          -- ASSIGN  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- NESTED_LOOP  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/optimizerts/results/channel/channel-subscribe.plan b/asterix-opt/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
new file mode 100644
index 0000000..4530923
--- /dev/null
+++ b/asterix-opt/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
@@ -0,0 +1,44 @@
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INSERT_DELETE  |PARTITIONED|
+        -- HASH_PARTITION_EXCHANGE [$$22]  |PARTITIONED|
+          -- ASSIGN  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- NESTED_LOOP  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- COMMIT  |PARTITIONED|
+      -- STREAM_PROJECT  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- INSERT_DELETE  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$8]  |PARTITIONED|
+              -- ASSIGN  |UNPARTITIONED|
+                -- STREAM_PROJECT  |UNPARTITIONED|
+                  -- ASSIGN  |UNPARTITIONED|
+                    -- STREAM_PROJECT  |UNPARTITIONED|
+                      -- ASSIGN  |UNPARTITIONED|
+                        -- ASSIGN  |UNPARTITIONED|
+                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan b/asterix-opt/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
new file mode 100644
index 0000000..a9e383a
--- /dev/null
+++ b/asterix-opt/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
@@ -0,0 +1,44 @@
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INSERT_DELETE  |PARTITIONED|
+        -- HASH_PARTITION_EXCHANGE [$$22]  |PARTITIONED|
+          -- ASSIGN  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- NESTED_LOOP  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+-- COMMIT  |PARTITIONED|
+  -- STREAM_PROJECT  |PARTITIONED|
+    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+      -- INSERT_DELETE  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- MATERIALIZE  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- BTREE_SEARCH  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql
new file mode 100644
index 0000000..41b036a
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.1.ddl.aql
@@ -0,0 +1,34 @@
+/*
+* Description  : Create Channel Test. Confirms that the subscription and result datasets are created
+* Expected Res : Success
+* Date         : March 2015
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.2.update.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.2.update.aql
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql
new file mode 100644
index 0000000..eb341e9
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_datasets/create_channel_check_datasets.3.query.aql
@@ -0,0 +1,7 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel where $result.ChannelName = "nearbyTweetChannel"
+for $x in dataset Metadata.Dataset
+where $x.DatasetName = $result.SubscriptionsDatasetName
+or $x.DatasetName = $result.ResultsDatasetName
+return $x;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql
new file mode 100644
index 0000000..7bace03
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.1.ddl.aql
@@ -0,0 +1,34 @@
+/*
+* Description  : Create Channel Test
+* Expected Res : Success
+* Date         : March 2015
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.2.update.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.2.update.aql
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql
new file mode 100644
index 0000000..9a1e170
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/create_channel_check_metadata/create_channel_check_metadata.3.query.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel return $result;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql
new file mode 100644
index 0000000..afc7d5e
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.1.ddl.aql
@@ -0,0 +1,38 @@
+/*
+* Description  : Drop Channel Test. Check Metadata
+* Expected Res : Success
+* Date         : March 2015
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel1 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel2 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel3 using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.update.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.update.aql
new file mode 100644
index 0000000..f466b9c
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.2.update.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+drop channel nearbyTweetChannel2;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql
new file mode 100644
index 0000000..e762a27
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_datasets/drop_channel_check_datasets.3.query.aql
@@ -0,0 +1,7 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel
+for $x in dataset Metadata.Dataset
+where $x.DatasetName = $result.SubscriptionsDatasetName
+or $x.DatasetName = $result.ResultsDatasetName
+return $x;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql
new file mode 100644
index 0000000..afc7d5e
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.ddl.aql
@@ -0,0 +1,38 @@
+/*
+* Description  : Drop Channel Test. Check Metadata
+* Expected Res : Success
+* Date         : March 2015
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel1 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel2 using NearbyTweetsContainingText@2 period duration("PT10M");
+
+create repetitive channel nearbyTweetChannel3 using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.update.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.update.aql
new file mode 100644
index 0000000..f466b9c
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.2.update.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+drop channel nearbyTweetChannel2;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql
new file mode 100644
index 0000000..9a1e170
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/drop_channel_check_metadata/drop_channel_check_metadata.3.query.aql
@@ -0,0 +1,3 @@
+use dataverse channels;
+
+for $result in dataset Metadata.Channel return $result;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
new file mode 100644
index 0000000..41b036a
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.1.ddl.aql
@@ -0,0 +1,34 @@
+/*
+* Description  : Create Channel Test. Confirms that the subscription and result datasets are created
+* Expected Res : Success
+* Date         : March 2015
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  sender-location: point,
+  send-time: datetime,
+  referred-topics: {{ string }},
+  message-text: string,
+  countA: int32,
+  countB: int32
+}
+
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
+
+create function NearbyTweetsContainingText($location, $text) {
+  for $tweet in dataset TweetMessageuuids
+  let $circle := create-circle($location,30.0)
+  where contains($tweet.message-text,$text)
+  and spatial-intersect($tweet.sender-location, $location)
+  return $tweet.message-text
+};
+
+create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M");
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql
new file mode 100644
index 0000000..6d35506
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.2.update.aql
@@ -0,0 +1,7 @@
+use dataverse channels;
+
+subscribe to nearbyTweetChannel (point("30.0, 30.0"), "Live");
+
+subscribe to nearbyTweetChannel (point("20.0, 20.0"), "Long");
+
+subscribe to nearbyTweetChannel (point("10.0, 10.0"), "Prosper");
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.query.aql
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.query.aql b/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.query.aql
new file mode 100644
index 0000000..0d2c46e
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/queries/channel/subscribe_channel_check_subscriptions/subscribe_channel_check_subscriptions.3.query.aql
@@ -0,0 +1,4 @@
+use dataverse channels;
+
+for $test in dataset nearbyTweetChannelSubscriptions
+return $test.param1;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/results/channels/create_channel_check_datasets/create_channel_check_datasets.1.adm
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/results/channels/create_channel_check_datasets/create_channel_check_datasets.1.adm b/asterix-opt/src/test/resources/runtimets/results/channels/create_channel_check_datasets/create_channel_check_datasets.1.adm
new file mode 100644
index 0000000..baa5299
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/results/channels/create_channel_check_datasets/create_channel_check_datasets.1.adm
@@ -0,0 +1,3 @@
+[ { "DataverseName": "channels", "DatasetName": "nearbyTweetChannelResults", "DataTypeName": "nearbyTweetChannelResultsType", "DatasetType": "INTERNAL", "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ "rid" ], "PrimaryKey": [ "rid" ], "GroupName": "DEFAULT_NG_ALL_NODES", "Autogenerated": true, "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ] }, "ExternalDetails": null, "Hints": {{  }}, "Timestamp": "Tue Mar 10 15:20:05 PDT 2015", "DatasetId": 106i32, "PendingOp": 0i32 }
+, { "DataverseName": "channels", "DatasetName": "nearbyTweetChannelSubscriptions", "DataTypeName": "nearbyTweetChannelSubscriptionsType", "DatasetType": "INTERNAL", "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ "subscription-id" ], "PrimaryKey": [ "subscription-id" ], "GroupName": "DEFAULT_NG_ALL_NODES", "Autogenerated": true, "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ] }, "ExternalDetails": null, "Hints": {{  }}, "Timestamp": "Tue Mar 10 15:20:05 PDT 2015", "DatasetId": 105i32, "PendingOp": 0i32 }
+ ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/results/channels/create_channel_check_metadata/create_channel_check_metadata.1.adm
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/results/channels/create_channel_check_metadata/create_channel_check_metadata.1.adm b/asterix-opt/src/test/resources/runtimets/results/channels/create_channel_check_metadata/create_channel_check_metadata.1.adm
new file mode 100644
index 0000000..66a52f3
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/results/channels/create_channel_check_metadata/create_channel_check_metadata.1.adm
@@ -0,0 +1,2 @@
+[ { "DataverseName": "channels", "ChannelName": "nearbyTweetChannel", "SubscriptionsDatasetName": "nearbyTweetChannelSubscriptions", "ResultsDatasetName": "nearbyTweetChannelResults", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
+ ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/results/channels/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/results/channels/drop_channel_check_datasets/drop_channel_check_datasets.1.adm b/asterix-opt/src/test/resources/runtimets/results/channels/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
new file mode 100644
index 0000000..7b6fdf4
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/results/channels/drop_channel_check_datasets/drop_channel_check_datasets.1.adm
@@ -0,0 +1,5 @@
+[ { "DataverseName": "channels", "DatasetName": "nearbyTweetChannel1Results", "DataTypeName": "nearbyTweetChannel1ResultsType", "DatasetType": "INTERNAL", "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ "rid" ], "PrimaryKey": [ "rid" ], "GroupName": "DEFAULT_NG_ALL_NODES", "Autogenerated": true, "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ] }, "ExternalDetails": null, "Hints": {{  }}, "Timestamp": "Wed Mar 11 10:11:53 PDT 2015", "DatasetId": 116i32, "PendingOp": 0i32 }
+, { "DataverseName": "channels", "DatasetName": "nearbyTweetChannel1Subscriptions", "DataTypeName": "nearbyTweetChannel1SubscriptionsType", "DatasetType": "INTERNAL", "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ "subscription-id" ], "PrimaryKey": [ "subscription-id" ], "GroupName": "DEFAULT_NG_ALL_NODES", "Autogenerated": true, "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ] }, "ExternalDetails": null, "Hints": {{  }}, "Timestamp": "Wed Mar 11 10:11:53 PDT 2015", "DatasetId": 115i32, "PendingOp": 0i32 }
+, { "DataverseName": "channels", "DatasetName": "nearbyTweetChannel3Results", "DataTypeName": "nearbyTweetChannel3ResultsType", "DatasetType": "INTERNAL", "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ "rid" ], "PrimaryKey": [ "rid" ], "GroupName": "DEFAULT_NG_ALL_NODES", "Autogenerated": true, "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ] }, "ExternalDetails": null, "Hints": {{  }}, "Timestamp": "Wed Mar 11 10:11:53 PDT 2015", "DatasetId": 120i32, "PendingOp": 0i32 }
+, { "DataverseName": "channels", "DatasetName": "nearbyTweetChannel3Subscriptions", "DataTypeName": "nearbyTweetChannel3SubscriptionsType", "DatasetType": "INTERNAL", "InternalDetails": { "FileStructure": "BTREE", "PartitioningStrategy": "HASH", "PartitioningKey": [ "subscription-id" ], "PrimaryKey": [ "subscription-id" ], "GroupName": "DEFAULT_NG_ALL_NODES", "Autogenerated": true, "CompactionPolicy": "prefix", "CompactionPolicyProperties": [ { "Name": "max-mergable-component-size", "Value": "1073741824" }, { "Name": "max-tolerance-component-count", "Value": "5" } ] }, "ExternalDetails": null, "Hints": {{  }}, "Timestamp": "Wed Mar 11 10:11:53 PDT 2015", "DatasetId": 119i32, "PendingOp": 0i32 }
+ ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0921e3c9/asterix-opt/src/test/resources/runtimets/results/channels/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
----------------------------------------------------------------------
diff --git a/asterix-opt/src/test/resources/runtimets/results/channels/drop_channel_check_metadata/drop_channel_check_metadata.1.adm b/asterix-opt/src/test/resources/runtimets/results/channels/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
new file mode 100644
index 0000000..de2948a
--- /dev/null
+++ b/asterix-opt/src/test/resources/runtimets/results/channels/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
@@ -0,0 +1,3 @@
+[ { "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", "SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", "ResultsDatasetName": "nearbyTweetChannel1Results", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
+, { "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", "SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", "ResultsDatasetName": "nearbyTweetChannel3Results", "Function": "channels.NearbyTweetsContainingText@2", "Duration": "PT10M" }
+ ]
\ No newline at end of file