You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2018/05/08 18:07:41 UTC

[2/2] asterixdb-bad git commit: Allow BAD jobs to update their specifications to use new indexes

Allow BAD jobs to update their specifications to use new indexes

- storage format changes: new field for Channel body

This changes uses the Asterix upsertDeployedJobSpec to
recompile and update the channel job when new indexes are
created.

Added test case
Moved methods from Asterix DeployedJobService to BADJobService

Change-Id: If0a4d37a5b91063fcb1673dbfd008c140ed54ae6


Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/0da2d001
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/0da2d001
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/0da2d001

Branch: refs/heads/master
Commit: 0da2d001ad18cc888a53b26a5c2867f8a90c9969
Parents: 345b0f5
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Mon May 7 15:12:18 2018 -0700
Committer: Steven Glenn Jacobs <sj...@ucr.edu>
Committed: Mon May 7 15:12:18 2018 -0700

----------------------------------------------------------------------
 .../org/apache/asterix/bad/BADConstants.java    |   1 +
 .../org/apache/asterix/bad/BADJobService.java   | 277 +++++++++++++
 .../apache/asterix/bad/ChannelJobService.java   |  66 ----
 .../asterix/bad/lang/BADStatementExecutor.java  | 214 ++++++++--
 .../lang/statement/CreateChannelStatement.java  |  46 +--
 .../statement/CreateProcedureStatement.java     |  98 ++---
 .../statement/ExecuteProcedureStatement.java    |  38 +-
 .../bad/metadata/BADMetadataRecordTypes.java    |   6 +-
 .../apache/asterix/bad/metadata/Channel.java    |   8 +-
 .../bad/metadata/ChannelTupleTranslator.java    |  12 +-
 .../metadata/DeployedJobSpecEventListener.java  |  99 ++---
 .../apache/asterix/bad/metadata/Procedure.java  |   3 -
 .../src/main/resources/lang-extension/lang.txt  |   2 +-
 .../asterix/bad/test/BADListenerTest.java       |  86 ++++
 .../channel/add_index/add_index.1.ddl.sqlpp     |  71 ++++
 .../channel/add_index/add_index.2.update.sqlpp  | 394 +++++++++++++++++++
 .../channel/add_index/add_index.3.update.sqlpp  |  28 ++
 .../channel/add_index/add_index.4.sleep.sqlpp   |  26 ++
 .../channel/add_index/add_index.5.query.sqlpp   |  29 ++
 .../channel/add_index/add_index.6.ddl.sqlpp     |  28 ++
 .../channel/drop_index/drop_index.1.ddl.sqlpp   |  88 +++++
 .../results/channel/add_index/add_index.1.adm   |   1 +
 .../create_channel_check_metadata.1.adm         |   2 +-
 .../drop_channel_check_metadata.1.adm           |   4 +-
 .../create_procedure_check_metadata.1.adm       |  12 +-
 .../src/test/resources/runtimets/testsuite.xml  |  11 +
 26 files changed, 1343 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index d2d0fa3..d422663 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -50,6 +50,7 @@ public interface BADConstants {
     String FIELD_NAME_RETURN_TYPE = "ReturnType";
     String FIELD_NAME_DEFINITION = "Definition";
     String FIELD_NAME_LANGUAGE = "Language";
+    String FIELD_NAME_BODY = "Body";
     //To enable new Asterix TxnId for separate deployed job spec invocations
     byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
     int EXECUTOR_TIMEOUT = 20;

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
new file mode 100644
index 0000000..e326ce6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+import java.io.StringReader;
+import java.time.Instant;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.BADParserFactory;
+import org.apache.asterix.bad.lang.BADStatementExecutor;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.SetStatement;
+import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IRequestParameters;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * Provides functionality for channel jobs
+ */
+public class BADJobService {
+
+    private static final Logger LOGGER = Logger.getLogger(BADJobService.class.getName());
+
+    //pool size one (only running one thread at a time)
+    private static final int POOL_SIZE = 1;
+
+    private static final long millisecondTimeout = BADConstants.EXECUTOR_TIMEOUT * 1000;
+
+    //Starts running a deployed job specification periodically with an interval of "period" seconds
+    public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
+            IHyracksClientConnection hcc, long period, Map<byte[], byte[]> jobParameters, EntityId entityId,
+            ITxnIdFactory txnIdFactory, DeployedJobSpecEventListener listener) {
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(POOL_SIZE);
+        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    if (!runDeployedJobSpecCheckPeriod(distributedId, hcc, jobParameters, period, entityId,
+                            txnIdFactory, listener)) {
+                        scheduledExecutorService.shutdown();
+                    }
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Job Failed to run for " + entityId.getExtensionName() + " "
+                            + entityId.getDataverse() + "." + entityId.getEntityName() + ".", e);
+                }
+            }
+        }, period, period, TimeUnit.MILLISECONDS);
+        return scheduledExecutorService;
+    }
+
+    public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, long period, EntityId entityId, ITxnIdFactory txnIdFactory,
+            DeployedJobSpecEventListener listener) throws Exception {
+        long executionMilliseconds =
+                runDeployedJobSpec(distributedId, hcc, jobParameters, entityId, txnIdFactory, null, listener, null);
+        if (executionMilliseconds > period) {
+            LOGGER.log(Level.SEVERE,
+                    "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+                            + entityId.getEntityName() + " was unable to meet the required period of " + period
+                            + " milliseconds. Actually took " + executionMilliseconds + " execution will shutdown"
+                            + new Date());
+            return false;
+        }
+        return true;
+    }
+
+    public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory,
+            ICcApplicationContext appCtx, DeployedJobSpecEventListener listener, QueryTranslator statementExecutor)
+            throws Exception {
+        listener.waitWhileAtState(ActivityState.SUSPENDED);
+
+        //Add the Asterix Transaction Id to the map
+        jobParameters.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
+                String.valueOf(txnIdFactory.create().getId()).getBytes());
+
+        long startTime = Instant.now().toEpochMilli();
+        JobId jobId = hcc.startJob(distributedId, jobParameters);
+
+        hcc.waitForCompletion(jobId);
+        long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
+
+        if (listener.getType() == DeployedJobSpecEventListener.PrecompiledType.QUERY) {
+            ResultReader resultReader = new ResultReader(listener.getResultDataset(), jobId, listener.getResultId());
+
+            ResultUtil.printResults(appCtx, resultReader, statementExecutor.getSessionOutput(),
+                    new IStatementExecutor.Stats(), null);
+        }
+
+        LOGGER.log(Level.SEVERE,
+                "Deployed Job execution completed for " + entityId.getExtensionName() + " " + entityId.getDataverse()
+                        + "." + entityId.getEntityName() + ". Took " + executionMilliseconds + " milliseconds ");
+
+        return executionMilliseconds;
+
+    }
+
+
+    public static long findPeriod(String duration) {
+        //TODO: Allow Repetitive Channels to use YMD durations
+        String hoursMinutesSeconds = "";
+        if (duration.indexOf('T') != -1) {
+            hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
+        }
+        double seconds = 0;
+        if (hoursMinutesSeconds != "") {
+            int pos = 0;
+            if (hoursMinutesSeconds.indexOf('H') != -1) {
+                Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
+                seconds += (hours * 60 * 60);
+                pos = hoursMinutesSeconds.indexOf('H') + 1;
+            }
+            if (hoursMinutesSeconds.indexOf('M') != -1) {
+                Double minutes =
+                        Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
+                seconds += (minutes * 60);
+                pos = hoursMinutesSeconds.indexOf('M') + 1;
+            }
+            if (hoursMinutesSeconds.indexOf('S') != -1) {
+                Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
+                seconds += (s);
+            }
+        }
+        return (long) (seconds * 1000);
+    }
+
+    public static JobSpecification compilePushChannel(IStatementExecutor statementExecutor,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc, Query q) throws Exception {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        JobSpecification jobSpec = null;
+        try {
+            jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+        } catch (Exception e) {
+            LOGGER.log(Level.INFO, e.getMessage(), e);
+            if (bActiveTxn) {
+                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+            }
+            throw e;
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+        return jobSpec;
+    }
+
+    public static void redeployJobSpec(EntityId entityId, String queryBodyString, MetadataProvider metadataProvider,
+            BADStatementExecutor badStatementExecutor, IHyracksClientConnection hcc,
+            IRequestParameters requestParameters) throws Exception {
+
+        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+        DeployedJobSpecEventListener listener =
+                (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
+        if (listener == null) {
+            LOGGER.severe("Tried to redeploy the job for " + entityId + " but no listener exists.");
+            return;
+        }
+
+        BADParserFactory factory = new BADParserFactory();
+        List<Statement> fStatements = factory.createParser(new StringReader(queryBodyString)).parse();
+        JobSpecification jobSpec = null;
+        if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)
+                || listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.CHANNEL)) {
+            //Channels
+            SetStatement ss = (SetStatement) fStatements.get(0);
+            metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
+            if (listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)) {
+                jobSpec = compilePushChannel(badStatementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
+            } else {
+                jobSpec = badStatementExecutor.handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc,
+                        null, null, null, null, true, null);
+            }
+        } else {
+            //Procedures
+            metadataProvider.setResultSetId(listener.getResultId());
+            final IStatementExecutor.ResultDelivery resultDelivery =
+                    requestParameters.getResultProperties().getDelivery();
+            final IHyracksDataset hdc = requestParameters.getHyracksDataset();
+            final IStatementExecutor.Stats stats = requestParameters.getStats();
+            boolean resultsAsync = resultDelivery == IStatementExecutor.ResultDelivery.ASYNC
+                    || resultDelivery == IStatementExecutor.ResultDelivery.DEFERRED;
+            metadataProvider.setResultAsyncMode(resultsAsync);
+            metadataProvider.setMaxResultReads(1);
+
+            jobSpec = compileProcedureJob(badStatementExecutor, metadataProvider, hcc, hdc, stats, fStatements.get(1));
+
+        }
+        hcc.upsertDeployedJobSpec(listener.getDeployedJobSpecId(), jobSpec);
+
+        listener.resume();
+
+    }
+
+    public static JobSpecification compileQueryJob(IStatementExecutor statementExecutor,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc, Query q) throws Exception {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        JobSpecification jobSpec = null;
+        try {
+            jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+        } catch (Exception e) {
+            ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+            throw e;
+        }
+        return jobSpec;
+    }
+
+    private static JobSpecification compileProcedureJob(IStatementExecutor statementExecutor,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc,
+            IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception {
+        if (procedureStatement.getKind() == Statement.Kind.INSERT) {
+            return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+                    procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null);
+        } else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
+            return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement);
+        } else {
+            SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
+            procedureStatement.accept(visitor, null);
+            return ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, procedureStatement,
+                    hcc, true);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "BADJobService";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
deleted file mode 100644
index 3df9a76..0000000
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad;
-
-import java.util.logging.Logger;
-
-
-/**
- * Provides functionality for channel jobs
- */
-public class ChannelJobService {
-
-    private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());
-
-
-    public static long findPeriod(String duration) {
-        //TODO: Allow Repetitive Channels to use YMD durations
-        String hoursMinutesSeconds = "";
-        if (duration.indexOf('T') != -1) {
-            hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
-        }
-        double seconds = 0;
-        if (hoursMinutesSeconds != "") {
-            int pos = 0;
-            if (hoursMinutesSeconds.indexOf('H') != -1) {
-                Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
-                seconds += (hours * 60 * 60);
-                pos = hoursMinutesSeconds.indexOf('H') + 1;
-            }
-            if (hoursMinutesSeconds.indexOf('M') != -1) {
-                Double minutes =
-                        Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
-                seconds += (minutes * 60);
-                pos = hoursMinutesSeconds.indexOf('M') + 1;
-            }
-            if (hoursMinutesSeconds.indexOf('S') != -1) {
-                Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
-                seconds += (s);
-            }
-        }
-        return (long) (seconds * 1000);
-    }
-
-
-    @Override
-    public String toString() {
-        return "ChannelJobService";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 8c7143f..4ab7530 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -18,22 +18,27 @@
  */
 package org.apache.asterix.bad.lang;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.bad.BADJobService;
 import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
 import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
 import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
 import org.apache.asterix.bad.metadata.Broker;
 import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.statement.FunctionDropStatement;
@@ -42,9 +47,12 @@ import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 
 public class BADStatementExecutor extends QueryTranslator {
@@ -56,16 +64,21 @@ public class BADStatementExecutor extends QueryTranslator {
 
     //TODO: Most of this file could go away if we had metadata dependencies
 
-    private void checkIfDatasetIsInUse(MetadataTransactionContext mdTxnCtx, String dataverse, String dataset)
-            throws CompilationException, AlgebricksException {
+    private Pair<List<Channel>, List<Procedure>> checkIfDatasetIsInUse(MetadataTransactionContext mdTxnCtx,
+            String dataverse, String dataset, boolean checkAll) throws AlgebricksException {
+        List<Channel> channelsUsingDataset = new ArrayList<>();
+        List<Procedure> proceduresUsingDataset = new ArrayList<>();
         List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
         for (Channel channel : channels) {
             List<List<List<String>>> dependencies = channel.getDependencies();
             List<List<String>> datasetDependencies = dependencies.get(0);
             for (List<String> dependency : datasetDependencies) {
                 if (dependency.get(0).equals(dataverse) && dependency.get(1).equals(dataset)) {
-                    throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". "
-                            + channel.getChannelId() + " depends on it!");
+                    channelsUsingDataset.add(channel);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingDataset, proceduresUsingDataset);
+                    }
+
                 }
             }
 
@@ -76,12 +89,82 @@ public class BADStatementExecutor extends QueryTranslator {
             List<List<String>> datasetDependencies = dependencies.get(0);
             for (List<String> dependency : datasetDependencies) {
                 if (dependency.get(0).equals(dataverse) && dependency.get(1).equals(dataset)) {
-                    throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". "
-                            + procedure.getEntityId() + " depends on it!");
+                    proceduresUsingDataset.add(procedure);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingDataset, proceduresUsingDataset);
+                    }
                 }
             }
 
         }
+        return new Pair<>(channelsUsingDataset, proceduresUsingDataset);
+    }
+
+    private Pair<List<Channel>, List<Procedure>> checkIfFunctionIsInUse(MetadataTransactionContext mdTxnCtx,
+            String dvId, String function, String arity, boolean checkAll)
+            throws CompilationException, AlgebricksException {
+        List<Channel> channelsUsingFunction = new ArrayList<>();
+        List<Procedure> proceduresUsingFunction = new ArrayList<>();
+
+        List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
+        for (Channel channel : channels) {
+            List<List<List<String>>> dependencies = channel.getDependencies();
+            List<List<String>> datasetDependencies = dependencies.get(1);
+            for (List<String> dependency : datasetDependencies) {
+                if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
+                        && dependency.get(2).equals(arity)) {
+                    channelsUsingFunction.add(channel);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingFunction, proceduresUsingFunction);
+                    }
+                }
+            }
+
+        }
+        List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx);
+        for (Procedure procedure : procedures) {
+            List<List<List<String>>> dependencies = procedure.getDependencies();
+            List<List<String>> datasetDependencies = dependencies.get(1);
+            for (List<String> dependency : datasetDependencies) {
+                if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
+                        && dependency.get(2).equals(arity)) {
+                    proceduresUsingFunction.add(procedure);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingFunction, proceduresUsingFunction);
+                    }
+                }
+            }
+
+        }
+        return new Pair<>(channelsUsingFunction, proceduresUsingFunction);
+    }
+
+    private void throwErrorIfDatasetUsed(MetadataTransactionContext mdTxnCtx, String dataverse, String dataset)
+            throws CompilationException, AlgebricksException {
+        Pair<List<Channel>, List<Procedure>> dependents = checkIfDatasetIsInUse(mdTxnCtx, dataverse, dataset, false);
+        if (dependents.first.size() > 0) {
+            throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". "
+                    + dependents.first.get(0).getChannelId() + " depends on it!");
+        }
+        if (dependents.second.size() > 0) {
+            throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". "
+                    + dependents.second.get(0).getEntityId() + " depends on it!");
+        }
+    }
+
+    private void throwErrorIfFunctionUsed(MetadataTransactionContext mdTxnCtx, String dataverse, String function,
+            String arity, FunctionSignature sig) throws CompilationException, AlgebricksException {
+        Pair<List<Channel>, List<Procedure>> dependents =
+                checkIfFunctionIsInUse(mdTxnCtx, dataverse, function, arity, false);
+        String errorStart = sig != null ? "Cannot drop function " + sig + "." : "Cannot drop index.";
+        if (dependents.first.size() > 0) {
+            throw new CompilationException(
+                    errorStart + " " + dependents.first.get(0).getChannelId() + " depends on it!");
+        }
+        if (dependents.second.size() > 0) {
+            throw new CompilationException(
+                    errorStart + " " + dependents.second.get(0).getEntityId() + " depends on it!");
+        }
     }
 
     @Override
@@ -92,13 +175,88 @@ public class BADStatementExecutor extends QueryTranslator {
         String dvId = getActiveDataverse(((DropDatasetStatement) stmt).getDataverseName());
         Identifier dsId = ((DropDatasetStatement) stmt).getDatasetName();
 
-        checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue());
+        throwErrorIfDatasetUsed(mdTxnCtx, dvId, dsId.getValue());
 
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         super.handleDatasetDropStatement(metadataProvider, stmt, hcc, requestParameters);
     }
 
     @Override
+    public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
+
+        //TODO: Check whether a delete or insert procedure using the index. If so, we will need to
+        // disallow the procedure until after the newly distributed version is ready
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        //Allow channels to use the new index
+        String dvId = getActiveDataverse(((CreateIndexStatement) stmt).getDataverseName());
+        String dsId = ((CreateIndexStatement) stmt).getDatasetName().getValue();
+
+        Pair<List<Channel>, List<Procedure>> usages = checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId, true);
+
+        List<Dataverse> dataverseList = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+        for (Dataverse dv : dataverseList) {
+            List<Function> functions = MetadataManager.INSTANCE.getFunctions(mdTxnCtx, dv.getDataverseName());
+            for (Function function : functions) {
+                for (List<String> datasetDependency : function.getDependencies().get(0)) {
+                    if (datasetDependency.get(0).equals(dvId) && datasetDependency.get(1).equals(dsId)) {
+                        Pair<List<Channel>, List<Procedure>> functionUsages =
+                                checkIfFunctionIsInUse(mdTxnCtx, function.getDataverseName(), function.getName(),
+                                        Integer.toString(function.getArity()), true);
+                        for (Channel channel : functionUsages.first) {
+                            if (!usages.first.contains(channel)) {
+                                usages.first.add(channel);
+                            }
+                        }
+                        for (Procedure procedure : functionUsages.second) {
+                            if (!usages.second.contains(procedure)) {
+                                usages.second.add(procedure);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+
+        for (Channel channel : usages.first) {
+            DeployedJobSpecEventListener listener =
+                    (DeployedJobSpecEventListener) activeEventHandler.getListener(channel.getChannelId());
+            listener.suspend();
+        }
+        for (Procedure procedure : usages.second) {
+            DeployedJobSpecEventListener listener =
+                    (DeployedJobSpecEventListener) activeEventHandler.getListener(procedure.getEntityId());
+            listener.suspend();
+        }
+
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        metadataProvider.getLocks().unlock();
+
+        metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+        super.handleCreateIndexStatement(metadataProvider, stmt, hcc, requestParameters);
+
+        for (Channel channel : usages.first) {
+            metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+            BADJobService.redeployJobSpec(channel.getChannelId(), channel.getChannelBody(), metadataProvider, this, hcc,
+                    requestParameters);
+            metadataProvider.getLocks().unlock();
+        }
+        for (Procedure procedure : usages.second) {
+            metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+            BADJobService.redeployJobSpec(procedure.getEntityId(), procedure.getBody(), metadataProvider, this, hcc,
+                    requestParameters);
+            metadataProvider.getLocks().unlock();
+        }
+
+
+    }
+
+    @Override
     protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -106,7 +264,20 @@ public class BADStatementExecutor extends QueryTranslator {
         String dvId = getActiveDataverse(((IndexDropStatement) stmt).getDataverseName());
         Identifier dsId = ((IndexDropStatement) stmt).getDatasetName();
 
-        checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue());
+        throwErrorIfDatasetUsed(mdTxnCtx, dvId, dsId.getValue());
+
+        List<Dataverse> dataverseList = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+        for (Dataverse dv : dataverseList) {
+            List<Function> functions = MetadataManager.INSTANCE.getFunctions(mdTxnCtx, dv.getDataverseName());
+            for (Function function : functions) {
+                for (List<String> datasetDependency : function.getDependencies().get(0)) {
+                    if (datasetDependency.get(0).equals(dvId) && datasetDependency.get(1).equals(dsId.getValue())) {
+                        throwErrorIfFunctionUsed(mdTxnCtx, function.getDataverseName(), function.getName(),
+                                Integer.toString(function.getArity()), null);
+                    }
+                }
+            }
+        }
 
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         super.handleIndexDropStatement(metadataProvider, stmt, hcc, requestParameters);
@@ -122,32 +293,7 @@ public class BADStatementExecutor extends QueryTranslator {
         String function = sig.getName();
         String arity = Integer.toString(sig.getArity());
 
-        List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
-        for (Channel channel : channels) {
-            List<List<List<String>>> dependencies = channel.getDependencies();
-            List<List<String>> datasetDependencies = dependencies.get(1);
-            for (List<String> dependency : datasetDependencies) {
-                if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
-                        && dependency.get(2).equals(arity)) {
-                    throw new CompilationException(
-                            "Cannot drop function " + sig + ". " + channel.getChannelId() + " depends on it!");
-                }
-            }
-
-        }
-        List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx);
-        for (Procedure procedure : procedures) {
-            List<List<List<String>>> dependencies = procedure.getDependencies();
-            List<List<String>> datasetDependencies = dependencies.get(1);
-            for (List<String> dependency : datasetDependencies) {
-                if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function)
-                        && dependency.get(2).equals(arity)) {
-                    throw new CompilationException(
-                            "Cannot drop function " + sig + ". " + procedure.getEntityId() + " depends on it!");
-                }
-            }
-
-        }
+        throwErrorIfFunctionUsed(mdTxnCtx, dvId, function, arity, sig);
 
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         super.handleFunctionDropStatement(metadataProvider, stmt);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 87ac320..22767f2 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -28,13 +28,12 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.ExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.BADJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.Channel;
@@ -57,7 +56,6 @@ import org.apache.asterix.lang.common.literal.StringLiteral;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
-import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.statement.SetStatement;
@@ -89,7 +87,7 @@ public class CreateChannelStatement extends ExtensionStatement {
     private final CallExpr period;
     private Identifier dataverseName;
     private String duration;
-    private InsertStatement channelResultsInsertQuery;
+    private String body;
     private String subscriptionsTableName;
     private String resultsTableName;
     private String dataverse;
@@ -133,10 +131,6 @@ public class CreateChannelStatement extends ExtensionStatement {
         return period;
     }
 
-    public InsertStatement getChannelResultsInsertQuery() {
-        return channelResultsInsertQuery;
-    }
-
     @Override
     public byte getCategory() {
         return Category.DDL;
@@ -221,28 +215,6 @@ public class CreateChannelStatement extends ExtensionStatement {
 
     }
 
-    private JobSpecification compilePushChannel(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
-            IHyracksClientConnection hcc, Query q) throws Exception {
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        JobSpecification jobSpec = null;
-        try {
-            jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-        } catch (Exception e) {
-            LOGGER.log(Level.INFO, e.getMessage(), e);
-            if (bActiveTxn) {
-                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
-            }
-            throw e;
-        } finally {
-            metadataProvider.getLocks().unlock();
-        }
-        return jobSpec;
-    }
-
     private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) throws Exception {
         StringBuilder builder = new StringBuilder();
@@ -271,13 +243,15 @@ public class CreateChannelStatement extends ExtensionStatement {
             builder.append(" returning a");
         }
         builder.append(";");
+        body = builder.toString();
         BADParserFactory factory = new BADParserFactory();
         List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
 
         SetStatement ss = (SetStatement) fStatements.get(0);
         metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
         if (push) {
-            return compilePushChannel(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(1));
+            return BADJobService.compilePushChannel(statementExecutor, metadataProvider, hcc,
+                    (Query) fStatements.get(1));
         }
         return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
                 hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
@@ -286,9 +260,10 @@ public class CreateChannelStatement extends ExtensionStatement {
     private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
             DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory) throws Exception {
         if (channeljobSpec != null) {
+            channeljobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
             DeployedJobSpecId destributedId = hcc.deployJobSpec(channeljobSpec);
-            ScheduledExecutorService ses = DeployedJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
-                    ChannelJobService.findPeriod(duration), new HashMap<>(), entityId, txnIdFactory);
+            ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
+                    BADJobService.findPeriod(duration), new HashMap<>(), entityId, txnIdFactory, listener);
             listener.storeDistributedInfo(destributedId, ses, null, null);
         }
 
@@ -354,14 +329,15 @@ public class CreateChannelStatement extends ExtensionStatement {
 
             // Now we subscribe
             if (listener == null) {
-                listener = new DeployedJobSpecEventListener(appCtx, entityId, PrecompiledType.CHANNEL, null,
+                listener = new DeployedJobSpecEventListener(appCtx, entityId,
+                        push ? PrecompiledType.PUSH_CHANNEL : PrecompiledType.CHANNEL, null,
                         "BadListener");
                 activeEventHandler.registerListener(listener);
             }
 
             setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory());
             channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
-                    duration, null);
+                    duration, null, body);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index f3561a4..03db7bc 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -20,18 +20,23 @@ package org.apache.asterix.bad.lang.statement;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
+import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.ExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.BADJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
@@ -42,7 +47,6 @@ import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.expression.VariableExpr;
@@ -54,7 +58,6 @@ import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
 import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
 import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
 import org.apache.asterix.metadata.MetadataManager;
@@ -62,14 +65,12 @@ import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.om.base.temporal.ADurationParserFactory;
-import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
@@ -84,7 +85,7 @@ public class CreateProcedureStatement extends ExtensionStatement {
 
     private final FunctionSignature signature;
     private final String procedureBody;
-    private final Statement procedureBodyStatement;
+    private Statement procedureBodyStatement;
     private final List<String> paramList;
     private final List<VariableExpr> varList;
     private final CallExpr period;
@@ -92,22 +93,32 @@ public class CreateProcedureStatement extends ExtensionStatement {
     private List<List<List<String>>> dependencies;
 
     public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList,
-            List<Integer> paramIds, String functionBody, Statement procedureBodyStatement, Expression period) {
+            List<Integer> paramIds, String functionBody, Expression period) {
         this.signature = signature;
-        this.procedureBody = functionBody;
-        this.procedureBodyStatement = procedureBodyStatement;
         this.paramList = new ArrayList<>();
         this.varList = new ArrayList<>();
         for (int i = 0; i < parameterList.size(); i++) {
-            this.paramList.add(parameterList.get(i).getValue());
-            this.varList.add(new VariableExpr(new VarIdentifier(parameterList.get(i).toString(), paramIds.get(i))));
+            this.paramList.add(parameterList.get(i).getValue().substring(1));
+            this.varList.add(
+                    new VariableExpr(new VarIdentifier(parameterList.get(i).getValue().substring(1), paramIds.get(i))));
         }
+        procedureBody = rewriteJobParams(functionBody);
         this.period = (CallExpr) period;
         this.dependencies = new ArrayList<>();
         this.dependencies.add(new ArrayList<>());
         this.dependencies.add(new ArrayList<>());
     }
 
+    private String rewriteJobParams(String body) {
+        String newBody = body;
+        for (VariableExpr var : varList) {
+            Pattern variableReference = Pattern.compile("([^\\w\\d])" + var.getVar() + "([^\\w\\d]|$)");
+            Matcher matcher = variableReference.matcher(newBody);
+            newBody = matcher.replaceAll("$1get_job_param(\"" + var.getVar() + "\")$2");
+        }
+        return "use " + signature.getNamespace() + ";\n" + newBody + ";";
+    }
+
     public String getProcedureBody() {
         return procedureBody;
     }
@@ -142,7 +153,14 @@ public class CreateProcedureStatement extends ExtensionStatement {
         return null;
     }
 
-    private void initialize() throws MetadataException, HyracksDataException {
+    private void initialize() throws CompilationException, HyracksDataException {
+        BADParserFactory factory = new BADParserFactory();
+        List<Statement> fStatements = factory.createParser(new StringReader(procedureBody)).parse();
+        if (fStatements.size() != 2) {
+            //TODO: Add a test for this error
+            throw new CompilationException("Procedure can only execute a single statement");
+        }
+        procedureBodyStatement = fStatements.get(1);
         if (period == null) {
             return;
         }
@@ -157,40 +175,6 @@ public class CreateProcedureStatement extends ExtensionStatement {
         durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
     }
 
-    private JobSpecification compileQueryJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
-            IHyracksClientConnection hcc, Query q) throws Exception {
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        JobSpecification jobSpec = null;
-        try {
-            jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-        } catch (Exception e) {
-            LOGGER.log(Level.INFO, e.getMessage(), e);
-            if (bActiveTxn) {
-                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
-            }
-            throw e;
-        }
-        return jobSpec;
-    }
-
-    private void addLets(SelectExpression s) {
-        FunctionIdentifier function = BuiltinFunctions.GET_JOB_PARAMETER;
-        FunctionSignature sig =
-                new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
-        for (VariableExpr var : varList) {
-            List<Expression> strListForCall = new ArrayList<>();
-            LiteralExpr l = new LiteralExpr(new StringLiteral(var.getVar().getValue()));
-            strListForCall.add(l);
-            Expression con = new CallExpr(sig, strListForCall);
-            LetClause let = new LetClause(var, con);
-            s.getLetList().add(let);
-        }
-    }
-
     private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExecutor statementExecutor,
             MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats)
                     throws Exception {
@@ -207,28 +191,23 @@ public class CreateProcedureStatement extends ExtensionStatement {
                             getProcedureBodyStatement(), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null),
                     PrecompiledType.INSERT);
         } else if (getProcedureBodyStatement().getKind() == Statement.Kind.QUERY) {
-            Query s = (Query) getProcedureBodyStatement();
-            addLets((SelectExpression) s.getBody());
             SqlppRewriterFactory fact = new SqlppRewriterFactory();
             dependencies.get(1).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(),
                     ((Query) getProcedureBodyStatement()).getBody(), metadataProvider).get(1));
-            Pair<JobSpecification, PrecompiledType> pair = new Pair<>(
-                    compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) getProcedureBodyStatement()),
+            Pair<JobSpecification, PrecompiledType> pair = new Pair<>(BADJobService.compileQueryJob(statementExecutor,
+                    metadataProvider, hcc, (Query) getProcedureBodyStatement()),
                     PrecompiledType.QUERY);
             dependencies.get(0).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(),
                     ((Query) getProcedureBodyStatement()).getBody(), metadataProvider).get(0));
-            metadataProvider.getLocks().unlock();
             return pair;
         } else if (getProcedureBodyStatement().getKind() == Statement.Kind.DELETE) {
             SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
             getProcedureBodyStatement().accept(visitor, null);
             DeleteStatement delete = (DeleteStatement) getProcedureBodyStatement();
-            addLets((SelectExpression) delete.getQuery().getBody());
 
             SqlppRewriterFactory fact = new SqlppRewriterFactory();
             dependencies = FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(), delete.getQuery().getBody(),
                     metadataProvider);
-
             Pair<JobSpecification, PrecompiledType> pair =
                     new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
                     getProcedureBodyStatement(), hcc, true), PrecompiledType.DELETE);
@@ -276,24 +255,16 @@ public class CreateProcedureStatement extends ExtensionStatement {
             if (alreadyActive) {
                 throw new AsterixException("Procedure " + signature.getName() + " is already running");
             }
-            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
-                    metadataProvider.getDefaultDataverse());
-            tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
             metadataProvider.setResultSetId(new ResultSetId(resultSetId++));
             final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
             final IHyracksDataset hdc = requestParameters.getHyracksDataset();
             final Stats stats = requestParameters.getStats();
             boolean resultsAsync = resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED;
             metadataProvider.setResultAsyncMode(resultsAsync);
-            tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
-            tempMdProvider.setResultAsyncMode(resultsAsync);
-            tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
-            tempMdProvider.setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
-            tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
-            tempMdProvider.setMaxResultReads(requestParameters.getResultProperties().getMaxReads());
+            metadataProvider.setMaxResultReads(1);
             //Create Procedure Internal Job
             Pair<JobSpecification, PrecompiledType> procedureJobSpec =
-                    createProcedureJob(statementExecutor, tempMdProvider, hcc, hdc, stats);
+                    createProcedureJob(statementExecutor, metadataProvider, hcc, hdc, stats);
 
             // Now we subscribe
             if (listener == null) {
@@ -301,7 +272,8 @@ public class CreateProcedureStatement extends ExtensionStatement {
                         "BadListener");
                 activeEventHandler.registerListener(listener);
             }
-            setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), hdc,
+            setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, listener, metadataProvider.getResultSetId(),
+                    hdc,
                     stats);
 
             procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 635f2ce..025b9e6 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -24,18 +24,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.ExtensionStatement;
-import org.apache.asterix.api.http.server.ResultUtil;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
-import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.BADJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
-import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -54,12 +50,10 @@ import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.translator.ConstantHelper;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public class ExecuteProcedureStatement extends ExtensionStatement {
@@ -111,10 +105,9 @@ public class ExecuteProcedureStatement extends ExtensionStatement {
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
         DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
-        Procedure procedure = null;
+        Procedure procedure;
 
         MetadataTransactionContext mdTxnCtx = null;
-        JobId jobId;
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             txnActive = true;
@@ -125,31 +118,14 @@ public class ExecuteProcedureStatement extends ExtensionStatement {
             Map<byte[], byte[]> contextRuntimeVarMap = createParameterMap(procedure);
             DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
             if (procedure.getDuration().equals("")) {
+                BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, contextRuntimeVarMap, entityId,
+                        metadataProvider.getTxnIdFactory(), appCtx, listener, (QueryTranslator) statementExecutor);
 
-                //Add the Asterix Transaction Id to the map
-                long newTxId = metadataProvider.getTxnIdFactory().create().getId();
-                contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
-                        String.valueOf(newTxId).getBytes());
-                jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);
-
-                boolean wait = Boolean.parseBoolean(metadataProvider.getConfig().get(
-                        ExecuteProcedureStatement.WAIT_FOR_COMPLETION));
-                if (wait || listener.getType() == PrecompiledType.QUERY) {
-                    hcc.waitForCompletion(jobId);
-                }
-
-                if (listener.getType() == PrecompiledType.QUERY) {
-                    ResultReader resultReader =
-                            new ResultReader(listener.getResultDataset(), jobId, listener.getResultId());
-
-                    ResultUtil.printResults(appCtx, resultReader,
-                            ((QueryTranslator) statementExecutor).getSessionOutput(), new Stats(), null);
-                }
 
             } else {
-                ScheduledExecutorService ses = DeployedJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
-                        ChannelJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId,
-                        metadataProvider.getTxnIdFactory());
+                ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
+                        BADJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId,
+                        metadataProvider.getTxnIdFactory(), listener);
                 listener.storeDistributedInfo(deployedJobSpecId, ses, listener.getResultDataset(),
                         listener.getResultId());
             }

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index 526e091..1e5e627 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -50,18 +50,20 @@ public class BADMetadataRecordTypes {
     public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
     public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
     public static final int CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX = 6;
+    public static final int CHANNEL_ARECORD_BODY_FIELD_INDEX = 7;
     public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
             // RecordTypeName
             BADConstants.RECORD_TYPENAME_CHANNEL,
             // FieldNames
             new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
                     BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration,
-                    BADConstants.FIELD_NAME_DEPENDENCIES },
+                    BADConstants.FIELD_NAME_DEPENDENCIES, BADConstants.FIELD_NAME_BODY },
             // FieldTypes
             new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
                     new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING,
                     new AOrderedListType(new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null),
-                            null) },
+                            null),
+                    BuiltinType.ASTRING },
             //IsOpen?
             true);
     //------------------------------------------ Broker ----------------------------------------//

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
index 5f7dad0..ed9346c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -37,6 +37,7 @@ public class Channel implements IExtensionMetadataEntity {
     private final String subscriptionsDatasetName;
     private final String resultsDatasetName;
     private final String duration;
+    private final String channelBody;
     private final FunctionSignature function;
     private final List<String> functionAsPath;
     /*
@@ -49,12 +50,13 @@ public class Channel implements IExtensionMetadataEntity {
     private final List<List<List<String>>> dependencies;
 
     public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
-            FunctionSignature function, String duration, List<List<List<String>>> dependencies) {
+            FunctionSignature function, String duration, List<List<List<String>>> dependencies, String channelBody) {
         this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
         this.function = function;
         this.duration = duration;
         this.resultsDatasetName = resultsDataset;
         this.subscriptionsDatasetName = subscriptionsDataset;
+        this.channelBody = channelBody;
         if (this.function.getNamespace() == null) {
             this.function.setNamespace(dataverseName);
         }
@@ -94,6 +96,10 @@ public class Channel implements IExtensionMetadataEntity {
         return duration;
     }
 
+    public String getChannelBody() {
+        return channelBody;
+    }
+
     public List<String> getFunctionAsPath() {
         return functionAsPath;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index 14db134..175280e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -123,11 +123,15 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
 
         }
 
+        String channelBody =
+                ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX))
+                        .getStringValue();
+
         FunctionSignature signature = new FunctionSignature(functionSignature.get(0), functionSignature.get(1),
                 Integer.parseInt(functionSignature.get(2)));
 
         channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration,
-                dependencies);
+                dependencies, channelBody);
         return channel;
     }
 
@@ -217,6 +221,12 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
         dependenciesListBuilder.write(fieldValue.getDataOutput(), true);
         recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX, fieldValue);
 
+        // write field 7
+        fieldValue.reset();
+        aString.setValue(channel.getChannelBody());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX, fieldValue);
+
         // write record
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
 

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
index 070c148..78f7c95 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.bad.metadata;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.asterix.active.ActivityState;
@@ -25,39 +27,28 @@ import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-
 public class DeployedJobSpecEventListener implements IActiveEntityEventsListener {
 
     private static final Logger LOGGER = Logger.getLogger(DeployedJobSpecEventListener.class);
 
-
     public enum PrecompiledType {
         CHANNEL,
+        PUSH_CHANNEL,
         QUERY,
         INSERT,
         DELETE
     }
 
-    enum RequestState {
-        INIT,
-        STARTED,
-        FINISHED
-    }
-
     private DeployedJobSpecId deployedJobSpecId;
     private ScheduledExecutorService executorService = null;
     private final PrecompiledType type;
@@ -67,14 +58,11 @@ public class DeployedJobSpecEventListener implements IActiveEntityEventsListener
 
     // members
     protected volatile ActivityState state;
-    protected JobId jobId;
-    protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>();
     protected final ICcApplicationContext appCtx;
     protected final EntityId entityId;
     protected final ActiveEvent statsUpdatedEvent;
     protected long statsTimestamp;
     protected String stats;
-    protected RequestState statsRequestState;
     protected final String runtimeName;
     protected final AlgebricksAbsolutePartitionConstraint locations;
     private int runningInstance;
@@ -83,18 +71,15 @@ public class DeployedJobSpecEventListener implements IActiveEntityEventsListener
             AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
         this.appCtx = appCtx;
         this.entityId = entityId;
-        this.state = ActivityState.STOPPED;
+        setState(ActivityState.STOPPED);
         this.statsTimestamp = -1;
-        this.statsRequestState = RequestState.INIT;
         this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null);
         this.stats = "{\"Stats\":\"N/A\"}";
         this.runtimeName = runtimeName;
         this.locations = locations;
-        state = ActivityState.STOPPED;
         this.type = type;
     }
 
-
     public IHyracksDataset getResultDataset() {
         return hdc;
     }
@@ -122,10 +107,6 @@ public class DeployedJobSpecEventListener implements IActiveEntityEventsListener
         return false;
     }
 
-    public JobId getJobId() {
-        return jobId;
-    }
-
     @Override
     public String getStats() {
         return stats;
@@ -136,40 +117,6 @@ public class DeployedJobSpecEventListener implements IActiveEntityEventsListener
         return statsTimestamp;
     }
 
-    public String formatStats(List<String> responses) {
-        StringBuilder strBuilder = new StringBuilder();
-        strBuilder.append("{\"Stats\": [").append(responses.get(0));
-        for (int i = 1; i < responses.size(); i++) {
-            strBuilder.append(", ").append(responses.get(i));
-        }
-        strBuilder.append("]}");
-        return strBuilder.toString();
-    }
-
-    protected synchronized void notifySubscribers(ActiveEvent event) {
-        notifyAll();
-        Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
-        while (it.hasNext()) {
-            IActiveEntityEventSubscriber subscriber = it.next();
-            if (subscriber.isDone()) {
-                it.remove();
-            } else {
-                try {
-                    subscriber.notify(event);
-                } catch (HyracksDataException e) {
-                    LOGGER.log(Level.WARN, "Failed to notify subscriber", e);
-                }
-                if (subscriber.isDone()) {
-                    it.remove();
-                }
-            }
-        }
-    }
-
-    public AlgebricksAbsolutePartitionConstraint getLocations() {
-        return locations;
-    }
-
     public PrecompiledType getType() {
         return type;
     }
@@ -214,12 +161,18 @@ public class DeployedJobSpecEventListener implements IActiveEntityEventsListener
         // no op
     }
 
+    protected synchronized void setState(ActivityState newState) {
+        LOGGER.info("State of " + getEntityId() + "is being set to " + newState + " from " + state);
+        this.state = newState;
+        notifyAll();
+    }
+
     private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Channel Job started for  " + entityId);
         }
         runningInstance++;
-        state = ActivityState.RUNNING;
+        setState(ActivityState.RUNNING);
     }
 
     private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
@@ -228,8 +181,32 @@ public class DeployedJobSpecEventListener implements IActiveEntityEventsListener
         }
         runningInstance--;
         if (runningInstance == 0) {
-            state = ActivityState.STOPPED;
+            setState(ActivityState.STOPPED);
+        }
+    }
+
+    public synchronized void waitWhileAtState(ActivityState undesiredState) throws InterruptedException {
+        while (state == undesiredState) {
+            this.wait();
+        }
+    }
+
+    public synchronized void suspend() throws HyracksDataException, InterruptedException {
+        LOGGER.info("Suspending entity " + entityId);
+        LOGGER.info("Waiting for ongoing activities of " + entityId);
+        waitWhileAtState(ActivityState.RUNNING);
+        LOGGER.info("Proceeding with suspension of " + entityId + ". Current state is " + state);
+        setState(ActivityState.SUSPENDED);
+        LOGGER.info("Successfully Suspended " + entityId);
+    }
+
+    public synchronized void resume() throws HyracksDataException {
+        LOGGER.info("Resuming entity " + entityId);
+        if (state != ActivityState.SUSPENDED) {
+            throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_RESUME_FROM_STATE, entityId, state);
         }
+        setState(ActivityState.STOPPED);
+        LOGGER.info("Successfully resumed " + entityId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
index 5712539..50d506b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
@@ -28,10 +28,7 @@ import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
 
 public class Procedure implements IExtensionMetadataEntity {
     private static final long serialVersionUID = 1L;
-    public static final String LANGUAGE_JAVA = "JAVA";
-
     public static final String RETURNTYPE_VOID = "VOID";
-    public static final String NOT_APPLICABLE = "N/A";
 
     private final EntityId procedureId;
     private final int arity;

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/main/resources/lang-extension/lang.txt
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 2d7ba75..4c83dc5 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -161,7 +161,7 @@ CreateProcedureStatement ProcedureSpecification() throws ParseException:
     }
   ("period" period = FunctionCallExpr())?
   {
-  return new CreateProcedureStatement(signature, paramList, paramIds, functionBody, functionBodyExpr, period);
+  return new CreateProcedureStatement(signature, paramList, paramIds, functionBody, period);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java
new file mode 100644
index 0000000..1cd49e3
--- /dev/null
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BADListenerTest {
+
+    private static DeployedJobSpecEventListener djsel;
+
+    private class suspend extends Thread {
+        @Override
+        public void run() {
+            try {
+                djsel.suspend();
+                Thread.sleep(5000);
+                djsel.resume();
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    private class run extends Thread {
+        @Override
+        public void run() {
+            try {
+                djsel.notify(new ActiveEvent(null, ActiveEvent.Kind.JOB_STARTED, null, null));
+                djsel.notify(new ActiveEvent(null, ActiveEvent.Kind.JOB_STARTED, null, null));
+                djsel.notify(new ActiveEvent(null, ActiveEvent.Kind.JOB_FINISHED, null, null));
+                Thread.sleep(5000);
+                djsel.notify(new ActiveEvent(null, ActiveEvent.Kind.JOB_FINISHED, null, null));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    @BeforeClass
+    public static void init() {
+        djsel = new DeployedJobSpecEventListener(null,
+                new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, "test", "test"),
+                DeployedJobSpecEventListener.PrecompiledType.CHANNEL, null, "BadListener");
+    }
+
+    @Test
+    public void DistributedTest() throws Exception {
+        new suspend().run();
+        djsel.waitWhileAtState(ActivityState.SUSPENDED);
+        new run().run();
+        djsel.suspend();
+    }
+
+    @AfterClass
+    public static void deinit() throws Exception {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/0da2d001/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
new file mode 100644
index 0000000..819d052
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type UserLocation as {
+  location: circle,
+  userName: string
+};
+
+
+create type EmergencyReport as {
+  reportId: uuid,
+  Etype: string,
+  location: circle
+};
+
+
+create type EmergencyShelter as {
+  shelterName: string,
+  location: point
+};
+
+create dataset UserLocations(UserLocation)
+primary key userName;
+create dataset Shelters(EmergencyShelter)
+primary key shelterName;
+create dataset Reports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index u_location on UserLocations(location) type RTREE;
+
+
+create function RecentEmergenciesNearUser(userName) {
+  (
+  select report, shelters from
+   ( select value r from Reports r)report,
+  UserLocations u
+    let shelters = (select s.location from Shelters s where spatial_intersect(s.location,u.location))
+  where u.userName = userName
+  and spatial_intersect(report.location,u.location)
+  )
+};
+
+create repetitive channel EmergencyChannel using RecentEmergenciesNearUser@1 period duration("PT10S");
+
+create broker brokerA at "http://www.notifyA.com";
\ No newline at end of file