You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2017/11/15 01:10:59 UTC

[2/2] asterixdb-bad git commit: Added parameterized procedures

Added parameterized procedures

Add tests, including concurrent/parameterized execution
delete and query procedures can both use parameters
these will use Asterix job parameters to assign at runtime
Add timeStamp index to channel results
Cleanup result code for query procedures
Prevent repetitive jobs from executing
multiple iterations concurrently

Change-Id: I999879b1cae0de179a1d3c232fa940228979f4fe


Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/8b53ce55
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/8b53ce55
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/8b53ce55

Branch: refs/heads/master
Commit: 8b53ce556eaac45c6698e448687f9e77eecc85e8
Parents: 1f36ec7
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Tue Nov 14 10:21:43 2017 -0800
Committer: Steven Glenn Jacobs <sj...@ucr.edu>
Committed: Tue Nov 14 10:21:43 2017 -0800

----------------------------------------------------------------------
 asterix-bad/pom.xml                             |  10 +
 .../org/apache/asterix/bad/BADConstants.java    |   2 +
 .../apache/asterix/bad/ChannelJobService.java   |  45 +--
 .../lang/statement/ChannelDropStatement.java    |  53 ++--
 .../lang/statement/CreateChannelStatement.java  |  61 +++--
 .../statement/CreateProcedureStatement.java     | 119 +++++---
 .../statement/ExecuteProcedureStatement.java    |  92 ++++++-
 .../lang/statement/ProcedureDropStatement.java  |  37 ++-
 .../metadata/DeployedJobSpecEventListener.java  | 271 +++++++++++++++++++
 .../metadata/PrecompiledJobEventListener.java   | 249 -----------------
 .../resources/asterix-build-configuration.xml   | 110 ++++++++
 .../src/main/resources/lang-extension/lang.txt  |  18 +-
 .../test/BADAsterixHyracksIntegrationUtil.java  |  60 ++++
 .../asterix/bad/test/BADExecutionTest.java      |   2 +-
 .../asterix/bad/test/BADOptimizerTest.java      |   2 +-
 .../conf/asterix-build-configuration.xml        | 110 --------
 .../queries/channel/channel-advanced.sqlpp      |  70 +++++
 .../results/channel/channel-advanced.plan       | 104 +++++++
 .../results/channel/channel-create.plan         |  64 +++--
 .../results/channel/channel-subscribe.plan      |  64 +++--
 .../results/channel/channel-unsubscribe.plan    |  64 +++--
 .../disasters_with_friends.1.ddl.sqlpp          |  70 +++++
 .../disasters_with_friends.2.update.sqlpp       |  27 ++
 .../disasters_with_friends.3.update.sqlpp       |  41 +++
 .../disasters_with_friends.4.sleep.sqlpp        |  26 ++
 .../disasters_with_friends.5.query.sqlpp        |  28 ++
 .../disasters_with_friends.6.ddl.sqlpp          |  28 ++
 .../ten_minute_channel.1.ddl.sqlpp              |  10 +-
 .../ten_minute_channel.4.sleep.sqlpp            |   2 +-
 .../ten_minute_channel.5.query.sqlpp            |   4 +-
 .../concurrent_procedure.1.ddl.sqlpp            |  44 +++
 .../concurrent_procedure.2.update.sqlpp         |  32 +++
 .../concurrent_procedure.3.ddl.sqlpp            |  27 ++
 .../concurrent_procedure.4.query.sqlpp          |  27 ++
 ...delete_procedure_with_parameters.1.ddl.sqlpp |  44 +++
 ...lete_procedure_with_parameters.2.query.sqlpp |  27 ++
 ...ete_procedure_with_parameters.3.update.sqlpp |  27 ++
 ...lete_procedure_with_parameters.4.query.sqlpp |  27 ++
 .../query_procedure_with_parameters.1.ddl.sqlpp |  45 +++
 ...ery_procedure_with_parameters.2.update.sqlpp |  27 ++
 ...ery_procedure_with_parameters.3.update.sqlpp |  27 ++
 .../disasters_with_friends.1.adm                |   1 +
 .../concurrent_procedure.4.adm                  |   1 +
 .../delete_procedure_with_parameters.1.adm      |   1 +
 .../delete_procedure_with_parameters.2.adm      |   1 +
 .../query_procedure_with_parameters.1.adm       |   2 +
 .../query_procedure_with_parameters.2.adm       |   2 +
 .../src/test/resources/runtimets/testsuite.xml  |  92 ++++---
 48 files changed, 1631 insertions(+), 666 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index 8738162..4f85b0a 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -244,6 +244,11 @@
       <version>${asterix.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-lang-common</artifactId>
+      <version>${asterix.version}</version>
+    </dependency>
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
@@ -289,6 +294,11 @@
       <version>${asterix.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-transactions</artifactId>
+      <version>${asterix.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
       <version>${hyracks.version}</version>

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index a906ae6..548f1ba 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -49,6 +49,8 @@ public interface BADConstants {
     public static final String FIELD_NAME_RETURN_TYPE = "ReturnType";
     public static final String FIELD_NAME_DEFINITION = "Definition";
     public static final String FIELD_NAME_LANGUAGE = "Language";
+    //To enable new Asterix TxnId for separate deployed job spec invocations
+    public static final byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
 
     public enum ChannelJobType {
         REPETITIVE

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
index ae24e0e..1db0669 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -23,66 +23,25 @@ import java.io.DataOutputStream;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
-import java.util.EnumSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.om.base.AOrderedList;
 import org.apache.asterix.om.base.AUUID;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
 
 /**
- * Provides functionality for running channel jobs and communicating with Brokers
+ * Provides functionality for channel jobs and communicating with Brokers
  */
 public class ChannelJobService {
 
     private static final Logger LOGGER = Logger.getLogger(ChannelJobService.class.getName());
 
-    public static ScheduledExecutorService startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
-            IHyracksClientConnection hcc, long duration)
-            throws Exception {
-        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
-        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    executeJob(jobSpec, jobFlags, jobId, hcc);
-                } catch (Exception e) {
-                    LOGGER.log(Level.WARNING, "Channel Job Failed to run.", e);
-                }
-            }
-        }, duration, duration, TimeUnit.MILLISECONDS);
-        return scheduledExecutorService;
-    }
-
-    public static void executeJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId,
-            IHyracksClientConnection hcc)
-            throws Exception {
-        LOGGER.info("Executing Channel Job");
-        if (jobId == null) {
-            hcc.startJob(jobSpec, jobFlags);
-        } else {
-            hcc.startJob(jobId);
-        }
-    }
-
-    public static void runChannelJob(JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception {
-        JobId jobId = hcc.startJob(channeljobSpec);
-        hcc.waitForCompletion(jobId);
-    }
-
     public static void sendBrokerNotificationsForChannel(EntityId activeJobId, String brokerEndpoint,
             AOrderedList subscriptionIds, String channelExecutionTime) throws HyracksDataException {
         String formattedString;
-            formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
+        formattedString = formatJSON(activeJobId, subscriptionIds, channelExecutionTime);
         sendMessage(brokerEndpoint, formattedString);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index f4ea2f3..2b189be 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -25,7 +29,7 @@ import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
@@ -38,11 +42,11 @@ import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 
 public class ChannelDropStatement implements IExtensionStatement {
+    private static final Logger LOGGER = Logger.getLogger(ChannelDropStatement.class.getName());
 
     private final Identifier dataverseName;
     private final Identifier channelName;
@@ -91,7 +95,7 @@ public class ChannelDropStatement implements IExtensionStatement {
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         ActiveNotificationHandler activeEventHandler =
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
+        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         Channel channel = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -109,22 +113,30 @@ public class ChannelDropStatement implements IExtensionStatement {
                 }
             }
 
-            listener.getExecutorService().shutdownNow();
-            JobId hyracksJobId = listener.getJobId();
-            listener.deActivate();
-            activeEventHandler.unregisterListener(listener);
-            if (hyracksJobId != null) {
-                hcc.destroyJob(hyracksJobId);
-                // wait for job completion to release any resources to be dropped
-                ensureJobDestroyed(hcc, hyracksJobId);
+            if (listener == null) {
+                //TODO: Channels need to better handle cluster failures
+                LOGGER.log(Level.SEVERE,
+                        "Tried to drop a Deployed Job  whose listener no longer exists:  "
+                                + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+                                + entityId.getEntityName() + ".");
+
+            } else {
+                listener.getExecutorService().shutdown();
+                listener.getExecutorService().awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+                DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
+                listener.deActivate();
+                activeEventHandler.unregisterListener(listener);
+                if (deployedJobSpecId != null) {
+                    hcc.undeployJobSpec(deployedJobSpecId);
+                }
             }
-
             //Create a metadata provider to use in nested jobs.
             MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
             tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
             //Drop the Channel Datasets
             //TODO: Need to find some way to handle if this fails.
             //TODO: Prevent datasets for Channels from being dropped elsewhere
+
             DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse),
                     new Identifier(channel.getResultsDatasetName()), true);
             ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null);
@@ -147,19 +159,4 @@ public class ChannelDropStatement implements IExtensionStatement {
         }
     }
 
-    private void ensureJobDestroyed(IHyracksClientConnection hcc, JobId hyracksJobId) throws Exception {
-        try {
-            hcc.waitForCompletion(hyracksJobId);
-        } catch (Exception e) {
-            // if the job has already been destroyed, it is safe to complete
-            if (e instanceof HyracksDataException) {
-                HyracksDataException hde = (HyracksDataException) e;
-                if (hde.getComponent().equals(ErrorCode.HYRACKS)
-                        && hde.getErrorCode() == ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY) {
-                    return;
-                }
-            }
-            throw e;
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 6d04f20..0898eec 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -22,13 +22,13 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.StringReader;
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -38,9 +38,10 @@ import org.apache.asterix.bad.ChannelJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -50,8 +51,10 @@ import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.IndexedTypeExpression;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
 import org.apache.asterix.lang.common.statement.InsertStatement;
@@ -69,11 +72,11 @@ import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
 
@@ -89,16 +92,14 @@ public class CreateChannelStatement implements IExtensionStatement {
     private InsertStatement channelResultsInsertQuery;
     private String subscriptionsTableName;
     private String resultsTableName;
-    private boolean distributed;
 
     public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
-            Expression period, boolean distributed) {
+            Expression period) {
         this.channelName = channelName;
         this.dataverseName = dataverseName;
         this.function = function;
         this.period = (CallExpr) period;
         this.duration = "";
-        this.distributed = distributed;
     }
 
     public Identifier getDataverseName() {
@@ -196,12 +197,32 @@ public class CreateChannelStatement implements IExtensionStatement {
                 new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
                 new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
 
+        //Create an index on timestamp for results
+        CreateIndexStatement createTimeIndex = new CreateIndexStatement();
+        createTimeIndex.setDatasetName(resultsName);
+        createTimeIndex.setDataverseName(new Identifier(dataverse));
+        createTimeIndex.setIndexName(new Identifier(resultsName + "TimeIndex"));
+        createTimeIndex.setIfNotExists(false);
+        createTimeIndex.setIndexType(IndexType.BTREE);
+        createTimeIndex.setEnforced(false);
+        createTimeIndex.setGramLength(0);
+        List<String> fNames = new ArrayList<>();
+        fNames.add(BADConstants.ChannelExecutionTime);
+        Pair<List<String>, IndexedTypeExpression> fields = new Pair<>(fNames, null);
+        createTimeIndex.addFieldExprPair(fields);
+        createTimeIndex.addFieldIndexIndicator(0);
+
+
         //Run both statements to create datasets
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
                 hcc, null);
         metadataProvider.getLocks().reset();
         ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc,
                 null);
+        metadataProvider.getLocks().reset();
+
+        //Create a time index for the results
+        ((QueryTranslator) statementExecutor).handleCreateIndexStatement(metadataProvider, createTimeIndex, hcc, null);
 
     }
 
@@ -240,16 +261,12 @@ public class CreateChannelStatement implements IExtensionStatement {
     }
 
     private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
-            PrecompiledJobEventListener listener, boolean predistributed) throws Exception {
+            DeployedJobSpecEventListener listener) throws Exception {
         if (channeljobSpec != null) {
-            //TODO: Find a way to fix optimizer tests so we don't need this check
-            JobId jobId = null;
-            if (predistributed) {
-                jobId = hcc.distributeJob(channeljobSpec);
-            }
-            ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
-                    jobId, hcc, ChannelJobService.findPeriod(duration));
-            listener.storeDistributedInfo(jobId, ses, null);
+            DeployedJobSpecId destributedId = hcc.deployJobSpec(channeljobSpec);
+            ScheduledExecutorService ses = DeployedJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
+                    ChannelJobService.findPeriod(duration), new HashMap<>(), entityId);
+            listener.storeDistributedInfo(destributedId, ses, null, null);
         }
 
     }
@@ -275,7 +292,7 @@ public class CreateChannelStatement implements IExtensionStatement {
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         ActiveNotificationHandler activeEventHandler =
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
+        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         boolean alreadyActive = false;
         Channel channel = null;
 
@@ -322,16 +339,12 @@ public class CreateChannelStatement implements IExtensionStatement {
                 datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()));
                 datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()));
                 //TODO: Add datasets used by channel function
-                listener = new PrecompiledJobEventListener(appCtx, entityId, PrecompiledType.CHANNEL, datasets, null,
+                listener = new DeployedJobSpecEventListener(appCtx, entityId, PrecompiledType.CHANNEL, datasets, null,
                         "BadListener");
                 activeEventHandler.registerListener(listener);
             }
 
-            if (distributed) {
-                setupExecutorJob(entityId, channeljobSpec, hcc, listener, true);
-            } else {
-                setupExecutorJob(entityId, channeljobSpec, hcc, listener, false);
-            }
+            setupExecutorJob(entityId, channeljobSpec, hcc, listener);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index 0666b38..adfa485 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -20,7 +20,6 @@ package org.apache.asterix.bad.lang.statement;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
-import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Level;
@@ -29,13 +28,11 @@ import java.util.logging.Logger;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
-import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.lang.BADParserFactory;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -44,30 +41,36 @@ import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.DeleteStatement;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
 import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.om.base.temporal.ADurationParserFactory;
+import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
 
@@ -76,26 +79,35 @@ public class CreateProcedureStatement implements IExtensionStatement {
     private static final Logger LOGGER = Logger.getLogger(CreateProcedureStatement.class.getName());
 
     private final FunctionSignature signature;
-    private final String functionBody;
+    private final String procedureBody;
+    private final Statement procedureBodyStatement;
     private final List<String> paramList;
+    private final List<VariableExpr> varList;
     private final CallExpr period;
     private String duration = "";
 
-    public String getFunctionBody() {
-        return functionBody;
-    }
-
-    public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList, String functionBody,
-            Expression period) {
+    public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList,
+            List<Integer> paramIds, String functionBody, Statement procedureBodyStatement, Expression period) {
         this.signature = signature;
-        this.functionBody = functionBody;
+        this.procedureBody = functionBody;
+        this.procedureBodyStatement = procedureBodyStatement;
         this.paramList = new ArrayList<>();
-        for (VarIdentifier varId : parameterList) {
-            this.paramList.add(varId.getValue());
+        this.varList = new ArrayList<>();
+        for (int i = 0; i < parameterList.size(); i++) {
+            this.paramList.add(parameterList.get(i).getValue());
+            this.varList.add(new VariableExpr(new VarIdentifier(parameterList.get(i).toString(), paramIds.get(i))));
         }
         this.period = (CallExpr) period;
     }
 
+    public String getProcedureBody() {
+        return procedureBody;
+    }
+
+    public Statement getProcedureBodyStatement() {
+        return procedureBodyStatement;
+    }
+
     @Override
     public byte getKind() {
         return Kind.EXTENSION;
@@ -105,6 +117,10 @@ public class CreateProcedureStatement implements IExtensionStatement {
         return paramList;
     }
 
+    public List<VariableExpr> getVarList() {
+        return varList;
+    }
+
     public FunctionSignature getSignature() {
         return signature;
     }
@@ -158,43 +174,56 @@ public class CreateProcedureStatement implements IExtensionStatement {
         return jobSpec;
     }
 
-    private Pair<JobSpecification, PrecompiledType> createProcedureJob(String body,
-            IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
-            IHyracksDataset hdc, Stats stats) throws Exception {
-        StringBuilder builder = new StringBuilder();
-        builder.append(body);
-        builder.append(";");
-        BADParserFactory factory = new BADParserFactory();
-        List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
-        if (fStatements.size() > 1) {
-            throw new CompilationException("Procedure can only execute a single statement");
+    private void addLets(SelectExpression s) {
+        FunctionIdentifier function = BuiltinFunctions.GET_JOB_PARAMETER;
+        FunctionSignature sig =
+                new FunctionSignature(function.getNamespace(), function.getName(), function.getArity());
+        for (VariableExpr var : varList) {
+            List<Expression> strListForCall = new ArrayList<>();
+            LiteralExpr l = new LiteralExpr(new StringLiteral(var.getVar().getValue()));
+            strListForCall.add(l);
+            Expression con = new CallExpr(sig, strListForCall);
+            LetClause let = new LetClause(var, con);
+            s.getLetList().add(let);
         }
-        if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
+    }
+
+    private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExecutor statementExecutor,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats)
+                    throws Exception {
+        if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) {
+            if (!varList.isEmpty()) {
+                throw new CompilationException("Insert procedures cannot have parameters");
+            }
             return new Pair<>(
                     ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
-                            fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null),
+                            getProcedureBodyStatement(), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null),
                     PrecompiledType.INSERT);
-        } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
-            Pair<JobSpecification, PrecompiledType> pair =
-                    new Pair<>(compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(0)),
-                            PrecompiledType.QUERY);
+        } else if (getProcedureBodyStatement().getKind() == Statement.Kind.QUERY) {
+            Query s = (Query) getProcedureBodyStatement();
+            addLets((SelectExpression) s.getBody());
+            Pair<JobSpecification, PrecompiledType> pair = new Pair<>(
+                    compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) getProcedureBodyStatement()),
+                    PrecompiledType.QUERY);
             metadataProvider.getLocks().unlock();
             return pair;
-        } else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
+        } else if (getProcedureBodyStatement().getKind() == Statement.Kind.DELETE) {
             SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
-            fStatements.get(0).accept(visitor, null);
+            getProcedureBodyStatement().accept(visitor, null);
+            DeleteStatement delete = (DeleteStatement) getProcedureBodyStatement();
+            addLets((SelectExpression) delete.getQuery().getBody());
             return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
-                    fStatements.get(0), hcc, true), PrecompiledType.DELETE);
+                    getProcedureBodyStatement(), hcc, true), PrecompiledType.DELETE);
         } else {
             throw new CompilationException("Procedure can only execute a single delete, insert, or query");
         }
     }
 
-    private void setupDistributedJob(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
-            PrecompiledJobEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats)
+    private void setupDeployedJobSpec(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
+            DeployedJobSpecEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats)
             throws Exception {
-        JobId jobId = hcc.distributeJob(jobSpec);
-        listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId));
+        DeployedJobSpecId deployedJobSpecId = hcc.deployJobSpec(jobSpec);
+        listener.storeDistributedInfo(deployedJobSpecId, null, hdc, resultSetId);
     }
 
     @Override
@@ -208,7 +237,7 @@ public class CreateProcedureStatement implements IExtensionStatement {
         String dataverse =
                 ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
+        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         boolean alreadyActive = false;
         Procedure procedure = null;
 
@@ -228,7 +257,7 @@ public class CreateProcedureStatement implements IExtensionStatement {
                 throw new AsterixException("Procedure " + signature.getName() + " is already running");
             }
             procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
-                    Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL, duration);
+                    Function.RETURNTYPE_VOID, getProcedureBody(), Function.LANGUAGE_AQL, duration);
             MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
                     metadataProvider.getDefaultDataverse());
             tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
@@ -246,16 +275,18 @@ public class CreateProcedureStatement implements IExtensionStatement {
 
             //Create Procedure Internal Job
             Pair<JobSpecification, PrecompiledType> procedureJobSpec =
-                    createProcedureJob(getFunctionBody(), statementExecutor, tempMdProvider, hcc, hdc, stats);
+                    createProcedureJob(statementExecutor, tempMdProvider, hcc, hdc, stats);
 
             // Now we subscribe
             if (listener == null) {
                 //TODO: Add datasets used by channel function
-                listener = new PrecompiledJobEventListener(appCtx, entityId, procedureJobSpec.second, new ArrayList<>(),
+                listener = new DeployedJobSpecEventListener(appCtx, entityId, procedureJobSpec.second,
+                        new ArrayList<>(),
                         null, "BadListener");
                 activeEventHandler.registerListener(listener);
             }
-            setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), hdc,
+            setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(),
+                    hdc,
                     stats);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 0dbd0a3..b6c66dc 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -18,9 +18,13 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
-import java.util.EnumSet;
+import java.io.DataOutput;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.api.http.server.ResultUtil;
@@ -30,35 +34,47 @@ import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.ChannelJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
+import org.apache.asterix.translator.ConstantHelper;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public class ExecuteProcedureStatement implements IExtensionStatement {
 
     private final String dataverseName;
     private final String procedureName;
     private final int arity;
+    private final List<Expression> argList;
 
-    public ExecuteProcedureStatement(String dataverseName, String procedureName, int arity) {
+    public ExecuteProcedureStatement(String dataverseName, String procedureName, int arity, List<Expression> argList) {
         this.dataverseName = dataverseName;
         this.procedureName = procedureName;
         this.arity = arity;
+        this.argList = argList;
     }
 
     public String getDataverseName() {
@@ -98,7 +114,7 @@ public class ExecuteProcedureStatement implements IExtensionStatement {
         String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(dataverseName));
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
+        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -109,22 +125,30 @@ public class ExecuteProcedureStatement implements IExtensionStatement {
             if (procedure == null) {
                 throw new AlgebricksException("There is no procedure with this name " + procedureName + ".");
             }
-
-            JobId hyracksJobId = listener.getJobId();
+            Map<byte[], byte[]> contextRuntimeVarMap = createParameterMap(procedure);
+            DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
             if (procedure.getDuration().equals("")) {
-                hcc.startJob(hyracksJobId);
+
+                //Add the Asterix Transaction Id to the map
+                contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
+                        String.valueOf(TxnIdFactory.create().getId()).getBytes());
+                JobId jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);
 
                 if (listener.getType() == PrecompiledType.QUERY) {
-                    hcc.waitForCompletion(hyracksJobId);
-                    ResultReader resultReader = listener.getResultReader();
+                    hcc.waitForCompletion(jobId);
+                    ResultReader resultReader =
+                            new ResultReader(listener.getResultDataset(), jobId, listener.getResultId());
+
                     ResultUtil.printResults(appCtx, resultReader,
                             ((QueryTranslator) statementExecutor).getSessionOutput(), new Stats(), null);
                 }
 
             } else {
-                ScheduledExecutorService ses = ChannelJobService.startJob(null, EnumSet.noneOf(JobFlag.class),
-                        hyracksJobId, hcc, ChannelJobService.findPeriod(procedure.getDuration()));
-                listener.storeDistributedInfo(hyracksJobId, ses, listener.getResultReader());
+                ScheduledExecutorService ses =
+                        DeployedJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
+                                ChannelJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId);
+                listener.storeDistributedInfo(deployedJobSpecId, ses, listener.getResultDataset(),
+                        listener.getResultId());
             }
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -140,4 +164,44 @@ public class ExecuteProcedureStatement implements IExtensionStatement {
         }
     }
 
+    private Map<byte[], byte[]> createParameterMap(Procedure procedure)
+            throws AsterixException, HyracksDataException {
+        Map<byte[], byte[]> map = new HashMap<>();
+        if (procedure.getParams().size() != argList.size()) {
+            throw AsterixException.create(ErrorCode.COMPILATION_INVALID_PARAMETER_NUMBER,
+                    procedure.getEntityId().getEntityName(), argList.size());
+        }
+        ArrayBackedValueStorage abvsKey = new ArrayBackedValueStorage();
+        DataOutput dosKey = abvsKey.getDataOutput();
+        ArrayBackedValueStorage abvsValue = new ArrayBackedValueStorage();
+        DataOutput dosValue = abvsValue.getDataOutput();
+
+        for (int i = 0; i < procedure.getParams().size(); i++) {
+            if (!(argList.get(i) instanceof LiteralExpr)) {
+                //TODO handle nonliteral arguments to procedure
+                throw AsterixException.create(ErrorCode.TYPE_UNSUPPORTED, procedure.getEntityId().getEntityName(),
+                        argList.get(i).getClass());
+            }
+            //Turn the argument name into a byte array
+            IAObject str = new AString(procedure.getParams().get(i));
+            abvsKey.reset();
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(str.getType()).serialize(str, dosKey);
+            //We do not save the type tag of the string key
+            byte[] key = new byte[abvsKey.getLength() - 1];
+            System.arraycopy(abvsKey.getByteArray(), 1, key, 0, abvsKey.getLength() - 1);
+
+            //Turn the argument value into a byte array
+            IAObject object = ConstantHelper.objectFromLiteral(((LiteralExpr) argList.get(i)).getValue());
+            abvsValue.reset();
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(object.getType()).serialize(object,
+                    dosValue);
+            byte[] value = new byte[abvsValue.getLength()];
+            System.arraycopy(abvsValue.getByteArray(), abvsValue.getStartOffset(), value, 0, abvsValue.getLength());
+
+            map.put(key, value);
+        }
+
+        return map;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 3c618ae..18e769d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -18,13 +18,17 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -39,9 +43,10 @@ import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
 
 public class ProcedureDropStatement implements IExtensionStatement {
+    private static final Logger LOGGER = Logger.getLogger(ProcedureDropStatement.class.getName());
 
     private final FunctionSignature signature;
     private boolean ifExists;
@@ -87,7 +92,7 @@ public class ProcedureDropStatement implements IExtensionStatement {
         signature.setNamespace(dataverse);
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
-        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
+        DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId);
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -106,14 +111,24 @@ public class ProcedureDropStatement implements IExtensionStatement {
                 }
             }
 
-            if (listener.getExecutorService() != null) {
-                listener.getExecutorService().shutdownNow();
-            }
-            JobId hyracksJobId = listener.getJobId();
-            listener.deActivate();
-            activeEventHandler.unregisterListener(listener);
-            if (hyracksJobId != null) {
-                hcc.destroyJob(hyracksJobId);
+            if (listener == null) {
+                //TODO: Channels need to better handle cluster failures
+                LOGGER.log(Level.SEVERE,
+                        "Tried to drop a Deployed Job  whose listener no longer exists:  "
+                                + entityId.getExtensionName() + " " + entityId.getDataverse() + "."
+                                + entityId.getEntityName() + ".");
+            } else {
+                if (listener.getExecutorService() != null) {
+                    listener.getExecutorService().shutdown();
+                    listener.getExecutorService().awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+                }
+                DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId();
+                listener.deActivate();
+                activeEventHandler.unregisterListener(listener);
+                if (deployedJobSpecId != null) {
+                    hcc.undeployJobSpec(deployedJobSpecId);
+                }
+
             }
 
             //Remove the Channel Metadata

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
new file mode 100644
index 0000000..950612c
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveEvent.Kind;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventSubscriber;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class DeployedJobSpecEventListener implements IActiveEntityEventsListener {
+
+    private static final Logger LOGGER = Logger.getLogger(DeployedJobSpecEventListener.class);
+
+
+    public enum PrecompiledType {
+        CHANNEL,
+        QUERY,
+        INSERT,
+        DELETE
+    }
+
+    enum RequestState {
+        INIT,
+        STARTED,
+        FINISHED
+    }
+
+    private DeployedJobSpecId deployedJobSpecId;
+    private ScheduledExecutorService executorService = null;
+    private ResultReader resultReader;
+    private final PrecompiledType type;
+
+    private IHyracksDataset hdc;
+    private ResultSetId resultSetId;
+    // members
+    protected volatile ActivityState state;
+    protected JobId jobId;
+    protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>();
+    protected final ICcApplicationContext appCtx;
+    protected final EntityId entityId;
+    protected final List<IDataset> datasets;
+    protected final ActiveEvent statsUpdatedEvent;
+    protected long statsTimestamp;
+    protected String stats;
+    protected RequestState statsRequestState;
+    protected final String runtimeName;
+    protected final AlgebricksAbsolutePartitionConstraint locations;
+    protected int numRegistered;
+
+    public DeployedJobSpecEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type,
+            List<IDataset> datasets, AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
+        this.appCtx = appCtx;
+        this.entityId = entityId;
+        this.datasets = datasets;
+        this.state = ActivityState.STOPPED;
+        this.statsTimestamp = -1;
+        this.statsRequestState = RequestState.INIT;
+        this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null);
+        this.stats = "{\"Stats\":\"N/A\"}";
+        this.runtimeName = runtimeName;
+        this.locations = locations;
+        this.numRegistered = 0;
+        state = ActivityState.STOPPED;
+        this.type = type;
+    }
+
+
+    public IHyracksDataset getResultDataset() {
+        return hdc;
+    }
+
+    public ResultSetId getResultId() {
+        return resultSetId;
+    }
+
+    public DeployedJobSpecId getDeployedJobSpecId() {
+        return deployedJobSpecId;
+    }
+
+    protected synchronized void handle(ActivePartitionMessage message) {
+        if (message.getEvent() == ActivePartitionMessage.Event.RUNTIME_REGISTERED) {
+            numRegistered++;
+            if (numRegistered == locations.getLocations().length) {
+                state = ActivityState.RUNNING;
+            }
+        }
+    }
+
+    @Override
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    @Override
+    public ActivityState getState() {
+        return state;
+    }
+
+    @Override
+    public boolean isEntityUsingDataset(IDataset dataset) {
+        return datasets.contains(dataset);
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public String getStats() {
+        return stats;
+    }
+
+    @Override
+    public long getStatsTimeStamp() {
+        return statsTimestamp;
+    }
+
+    public String formatStats(List<String> responses) {
+        StringBuilder strBuilder = new StringBuilder();
+        strBuilder.append("{\"Stats\": [").append(responses.get(0));
+        for (int i = 1; i < responses.size(); i++) {
+            strBuilder.append(", ").append(responses.get(i));
+        }
+        strBuilder.append("]}");
+        return strBuilder.toString();
+    }
+
+    protected synchronized void notifySubscribers(ActiveEvent event) {
+        notifyAll();
+        Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
+        while (it.hasNext()) {
+            IActiveEntityEventSubscriber subscriber = it.next();
+            if (subscriber.isDone()) {
+                it.remove();
+            } else {
+                try {
+                    subscriber.notify(event);
+                } catch (HyracksDataException e) {
+                    LOGGER.log(Level.WARN, "Failed to notify subscriber", e);
+                }
+                if (subscriber.isDone()) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public AlgebricksAbsolutePartitionConstraint getLocations() {
+        return locations;
+    }
+
+    public ResultReader getResultReader() {
+        return resultReader;
+    }
+
+    public PrecompiledType getType() {
+        return type;
+    }
+
+    public void storeDistributedInfo(DeployedJobSpecId deployedJobSpecId, ScheduledExecutorService ses,
+            IHyracksDataset hdc, ResultSetId resultSetId) {
+        this.deployedJobSpecId = deployedJobSpecId;
+        this.executorService = ses;
+        this.hdc = hdc;
+        this.resultSetId = resultSetId;
+    }
+
+    public ScheduledExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void deActivate() {
+        state = ActivityState.STOPPED;
+    }
+
+    @Override
+    public void notify(ActiveEvent event) {
+        try {
+            switch (event.getEventKind()) {
+                case JOB_STARTED:
+                    handleJobStartEvent(event);
+                    break;
+                case JOB_FINISHED:
+                    handleJobFinishEvent(event);
+                    break;
+                default:
+                    break;
+
+            }
+        } catch (Exception e) {
+            LOGGER.error("Unhandled Exception", e);
+        }
+    }
+
+    @Override
+    public void refreshStats(long l) throws HyracksDataException {
+        // no op
+    }
+
+    private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Channel Job started for  " + entityId);
+        }
+        state = ActivityState.RUNNING;
+    }
+
+    private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Channel Job finished for  " + entityId);
+        }
+    }
+
+    @Override
+    public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException {
+        // no op
+    }
+
+    @Override
+    public boolean isActive() {
+        return state == ActivityState.RUNNING;
+    }
+
+    @Override
+    public void unregister() throws HyracksDataException {
+    }
+
+    @Override
+    public Exception getJobFailure() {
+        return null;
+    }
+
+    @Override
+    public String getDisplayName() throws HyracksDataException {
+        return this.entityId.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
deleted file mode 100644
index 5036549..0000000
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.metadata;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.asterix.active.ActiveEvent;
-import org.apache.asterix.active.ActiveEvent.Kind;
-import org.apache.asterix.active.ActivityState;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEntityEventSubscriber;
-import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.active.message.ActivePartitionMessage.Event;
-import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.metadata.IDataset;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-public class PrecompiledJobEventListener implements IActiveEntityEventsListener {
-
-    private static final Logger LOGGER = Logger.getLogger(PrecompiledJobEventListener.class);
-
-    public enum PrecompiledType {
-        CHANNEL,
-        QUERY,
-        INSERT,
-        DELETE
-    }
-
-    enum RequestState {
-        INIT,
-        STARTED,
-        FINISHED
-    }
-
-    private ScheduledExecutorService executorService = null;
-    private ResultReader resultReader;
-    private final PrecompiledType type;
-    // members
-    protected volatile ActivityState state;
-    protected JobId jobId;
-    protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>();
-    protected final ICcApplicationContext appCtx;
-    protected final EntityId entityId;
-    protected final List<IDataset> datasets;
-    protected final ActiveEvent statsUpdatedEvent;
-    protected long statsTimestamp;
-    protected String stats;
-    protected RequestState statsRequestState;
-    protected final String runtimeName;
-    protected final AlgebricksAbsolutePartitionConstraint locations;
-    protected int numRegistered;
-
-    public PrecompiledJobEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type,
-            List<IDataset> datasets, AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
-        this.appCtx = appCtx;
-        this.entityId = entityId;
-        this.datasets = datasets;
-        this.state = ActivityState.STOPPED;
-        this.statsTimestamp = -1;
-        this.statsRequestState = RequestState.INIT;
-        this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null);
-        this.stats = "{\"Stats\":\"N/A\"}";
-        this.runtimeName = runtimeName;
-        this.locations = locations;
-        this.numRegistered = 0;
-        state = ActivityState.STOPPED;
-        this.type = type;
-    }
-
-    protected synchronized void handle(ActivePartitionMessage message) {
-        if (message.getEvent() == Event.RUNTIME_REGISTERED) {
-            numRegistered++;
-            if (numRegistered == locations.getLocations().length) {
-                state = ActivityState.RUNNING;
-            }
-        }
-    }
-
-    @Override
-    public EntityId getEntityId() {
-        return entityId;
-    }
-
-    @Override
-    public ActivityState getState() {
-        return state;
-    }
-
-    @Override
-    public boolean isEntityUsingDataset(IDataset dataset) {
-        return datasets.contains(dataset);
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-
-    @Override
-    public String getStats() {
-        return stats;
-    }
-
-    @Override
-    public long getStatsTimeStamp() {
-        return statsTimestamp;
-    }
-
-    public String formatStats(List<String> responses) {
-        StringBuilder strBuilder = new StringBuilder();
-        strBuilder.append("{\"Stats\": [").append(responses.get(0));
-        for (int i = 1; i < responses.size(); i++) {
-            strBuilder.append(", ").append(responses.get(i));
-        }
-        strBuilder.append("]}");
-        return strBuilder.toString();
-    }
-
-    protected synchronized void notifySubscribers(ActiveEvent event) {
-        notifyAll();
-        Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
-        while (it.hasNext()) {
-            IActiveEntityEventSubscriber subscriber = it.next();
-            if (subscriber.isDone()) {
-                it.remove();
-            } else {
-                try {
-                    subscriber.notify(event);
-                } catch (HyracksDataException e) {
-                    LOGGER.log(Level.WARN, "Failed to notify subscriber", e);
-                }
-                if (subscriber.isDone()) {
-                    it.remove();
-                }
-            }
-        }
-    }
-
-    public AlgebricksAbsolutePartitionConstraint getLocations() {
-        return locations;
-    }
-
-    public ResultReader getResultReader() {
-        return resultReader;
-    }
-
-    public PrecompiledType getType() {
-        return type;
-    }
-
-    public void storeDistributedInfo(JobId jobId, ScheduledExecutorService ses, ResultReader resultReader) {
-        this.jobId = jobId;
-        this.executorService = ses;
-        this.resultReader = resultReader;
-    }
-
-    public ScheduledExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public void deActivate() {
-        state = ActivityState.STOPPED;
-    }
-
-    @Override
-    public void notify(ActiveEvent event) {
-        try {
-            switch (event.getEventKind()) {
-                case JOB_STARTED:
-                    handleJobStartEvent(event);
-                    break;
-                case JOB_FINISHED:
-                    handleJobFinishEvent(event);
-                    break;
-                default:
-                    break;
-
-            }
-        } catch (Exception e) {
-            LOGGER.error("Unhandled Exception", e);
-        }
-    }
-
-    @Override
-    public void refreshStats(long l) throws HyracksDataException {
-        // no op
-    }
-
-    private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Channel Job started for  " + entityId);
-        }
-        state = ActivityState.RUNNING;
-    }
-
-    private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Channel Job finished for  " + entityId);
-        }
-    }
-
-    @Override
-    public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException {
-        // no op
-    }
-
-    @Override
-    public boolean isActive() {
-        return state == ActivityState.RUNNING;
-    }
-
-    @Override
-    public void unregister() throws HyracksDataException {
-    }
-
-    @Override
-    public Exception getJobFailure() {
-        return null;
-    }
-
-    @Override
-    public String getDisplayName() throws HyracksDataException {
-        return this.entityId.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/main/resources/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/resources/asterix-build-configuration.xml b/asterix-bad/src/main/resources/asterix-build-configuration.xml
new file mode 100644
index 0000000..6007416
--- /dev/null
+++ b/asterix-bad/src/main/resources/asterix-build-configuration.xml
@@ -0,0 +1,110 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+  <metadataNode>asterix_nc1</metadataNode>
+  <store>
+    <ncId>asterix_nc1</ncId>
+    <storeDirs>iodevice0,iodevice1</storeDirs>
+  </store>
+  <store>
+    <ncId>asterix_nc2</ncId>
+    <storeDirs>iodevice0,iodevice1</storeDirs>
+  </store>
+  <transactionLogDir>
+    <ncId>asterix_nc1</ncId>
+    <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
+  </transactionLogDir>
+  <transactionLogDir>
+    <ncId>asterix_nc2</ncId>
+    <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
+  </transactionLogDir>
+  <extensions>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.lang.BADQueryTranslatorExtension</extensionClassName>
+    </extension>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.lang.BADLangExtension</extensionClassName>
+    </extension>
+    <extension>
+      <extensionClassName>org.apache.asterix.bad.metadata.BADMetadataExtension</extensionClassName>
+    </extension>
+  </extensions>
+  <property>
+    <name>max.wait.active.cluster</name>
+    <value>60</value>
+    <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
+      nodes are available)
+      before a submitted query/statement can be
+      executed. (Default = 60 seconds)
+    </description>
+  </property>
+  <property>
+    <name>log.level</name>
+    <value>WARNING</value>
+    <description>Log level for running tests/build</description>
+  </property>
+  <property>
+    <name>compiler.framesize</name>
+    <value>32KB</value>
+  </property>
+  <property>
+    <name>compiler.sortmemory</name>
+    <value>640KB</value>
+  </property>
+  <property>
+    <name>compiler.groupmemory</name>
+    <value>640KB</value>
+  </property>
+  <property>
+    <name>compiler.joinmemory</name>
+    <value>640KB</value>
+  </property>
+  <property>
+    <name>compiler.pregelix.home</name>
+    <value>~/pregelix</value>
+  </property>
+  <property>
+    <name>storage.buffercache.pagesize</name>
+    <value>32768</value>
+    <description>The page size in bytes for pages in the buffer cache.
+      (Default = "32768" // 32KB)
+    </description>
+  </property>
+  <property>
+    <name>storage.buffercache.size</name>
+    <value>33554432</value>
+    <description>The size of memory allocated to the disk buffer cache.
+      The value should be a multiple of the buffer cache page size(Default
+      = "33554432" // 32MB)
+    </description>
+  </property>
+  <property>
+    <name>storage.memorycomponent.numpages</name>
+    <value>8</value>
+    <description>The number of pages to allocate for a memory component.
+      (Default = 8)
+    </description>
+  </property>
+  <property>
+    <name>plot.activate</name>
+    <value>false</value>
+    <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+    </description>
+  </property>
+</asterixConfiguration>

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/main/resources/lang-extension/lang.txt
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 6a06ea6..7c5931c 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -101,16 +101,15 @@ CreateChannelStatement ChannelSpecification() throws ParseException:
   CreateChannelStatement ccs = null;
   String fqFunctionName = null;
   Expression period = null;
-  boolean distributed = true;
 }
 {
   (
     "repetitive" "channel"  nameComponents = QualifiedName()
     <USING> appliedFunction = FunctionSignature()
-    "period" period = FunctionCallExpr() ("nondistributed" { distributed = false; })?
+    "period" period = FunctionCallExpr()
     {
       ccs = new CreateChannelStatement(nameComponents.first,
-                                   nameComponents.second, appliedFunction, period, distributed);
+                                   nameComponents.second, appliedFunction, period);
     }
   )
     {
@@ -124,6 +123,7 @@ CreateProcedureStatement ProcedureSpecification() throws ParseException:
   FunctionName fctName = null;
   FunctionSignature signature;
   List<VarIdentifier> paramList = new ArrayList<VarIdentifier>();
+  List<Integer> paramIds = new ArrayList<Integer>();
   String functionBody;
   Token beginPos;
   Token endPos;
@@ -135,7 +135,13 @@ CreateProcedureStatement ProcedureSpecification() throws ParseException:
      paramList = ParameterList()
     <LEFTBRACE>
   {
-     beginPos = token;
+    for (VarIdentifier param : paramList)
+    {
+      VarIdentifier v = new VarIdentifier(param.toString());
+      getCurrentScope().addNewVarSymbolToScope(v);
+      paramIds.add(v.getId());
+    }
+    beginPos = token;
   }
   functionBodyExpr = SingleStatement() <RIGHTBRACE>
     {
@@ -146,7 +152,7 @@ CreateProcedureStatement ProcedureSpecification() throws ParseException:
     }
   ("period" period = FunctionCallExpr())?
   {
-  return new CreateProcedureStatement(signature, paramList, functionBody, period);
+  return new CreateProcedureStatement(signature, paramList, paramIds, functionBody, functionBodyExpr, period);
   }
 }
 
@@ -176,7 +182,7 @@ ExecuteProcedureStatement ProcedureExecution() throws ParseException:
   )*)? <RIGHTPAREN>
     {
       String fqFunctionName =  funcName.function;
-      return new ExecuteProcedureStatement(funcName.dataverse, fqFunctionName, arity);
+      return new ExecuteProcedureStatement(funcName.dataverse, fqFunctionName, arity, argList);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADAsterixHyracksIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADAsterixHyracksIntegrationUtil.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADAsterixHyracksIntegrationUtil.java
new file mode 100644
index 0000000..65c184c
--- /dev/null
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADAsterixHyracksIntegrationUtil.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class BADAsterixHyracksIntegrationUtil extends AsterixHyracksIntegrationUtil {
+
+    public static void main(String[] args) throws Exception {
+        BADAsterixHyracksIntegrationUtil integrationUtil = new BADAsterixHyracksIntegrationUtil();
+        try {
+            integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"),
+                    System.getProperty("external.lib", ""));
+        } catch (Exception e) {
+            System.exit(1);
+        }
+    }
+
+    @Override
+    protected void run(boolean cleanupOnStart, boolean cleanupOnShutdown, String loadExternalLibs) throws Exception {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                try {
+                    deinit(cleanupOnShutdown);
+                } catch (Exception e) {
+
+                }
+            }
+        });
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY,
+                "asterixdb/asterix-opt/asterix-bad/src/main/resources/asterix-build-configuration.xml");
+
+        init(cleanupOnStart);
+        while (true) {
+            Thread.sleep(10000);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
index d3ec0ba..57c9738 100644
--- a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
@@ -48,7 +48,7 @@ public class BADExecutionTest {
     protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
             File.separator);
 
-    protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+    protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/asterix-build-configuration.xml";
 
     protected static TransactionProperties txnProperties;
     private static final TestExecutor testExecutor = new TestExecutor();

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
index ad2f1bf..36ba481 100644
--- a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
@@ -38,7 +38,7 @@ public class BADOptimizerTest extends OptimizerTest {
 
     @BeforeClass
     public static void setUp() throws Exception {
-        TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
+        TEST_CONFIG_FILE_NAME = "src/main/resources/asterix-build-configuration.xml";
         System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
         final File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml b/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
deleted file mode 100644
index 6007416..0000000
--- a/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-<asterixConfiguration xmlns="asterixconf">
-  <metadataNode>asterix_nc1</metadataNode>
-  <store>
-    <ncId>asterix_nc1</ncId>
-    <storeDirs>iodevice0,iodevice1</storeDirs>
-  </store>
-  <store>
-    <ncId>asterix_nc2</ncId>
-    <storeDirs>iodevice0,iodevice1</storeDirs>
-  </store>
-  <transactionLogDir>
-    <ncId>asterix_nc1</ncId>
-    <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
-  </transactionLogDir>
-  <transactionLogDir>
-    <ncId>asterix_nc2</ncId>
-    <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
-  </transactionLogDir>
-  <extensions>
-    <extension>
-      <extensionClassName>org.apache.asterix.bad.lang.BADQueryTranslatorExtension</extensionClassName>
-    </extension>
-    <extension>
-      <extensionClassName>org.apache.asterix.bad.lang.BADLangExtension</extensionClassName>
-    </extension>
-    <extension>
-      <extensionClassName>org.apache.asterix.bad.metadata.BADMetadataExtension</extensionClassName>
-    </extension>
-  </extensions>
-  <property>
-    <name>max.wait.active.cluster</name>
-    <value>60</value>
-    <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
-      nodes are available)
-      before a submitted query/statement can be
-      executed. (Default = 60 seconds)
-    </description>
-  </property>
-  <property>
-    <name>log.level</name>
-    <value>WARNING</value>
-    <description>Log level for running tests/build</description>
-  </property>
-  <property>
-    <name>compiler.framesize</name>
-    <value>32KB</value>
-  </property>
-  <property>
-    <name>compiler.sortmemory</name>
-    <value>640KB</value>
-  </property>
-  <property>
-    <name>compiler.groupmemory</name>
-    <value>640KB</value>
-  </property>
-  <property>
-    <name>compiler.joinmemory</name>
-    <value>640KB</value>
-  </property>
-  <property>
-    <name>compiler.pregelix.home</name>
-    <value>~/pregelix</value>
-  </property>
-  <property>
-    <name>storage.buffercache.pagesize</name>
-    <value>32768</value>
-    <description>The page size in bytes for pages in the buffer cache.
-      (Default = "32768" // 32KB)
-    </description>
-  </property>
-  <property>
-    <name>storage.buffercache.size</name>
-    <value>33554432</value>
-    <description>The size of memory allocated to the disk buffer cache.
-      The value should be a multiple of the buffer cache page size(Default
-      = "33554432" // 32MB)
-    </description>
-  </property>
-  <property>
-    <name>storage.memorycomponent.numpages</name>
-    <value>8</value>
-    <description>The number of pages to allocate for a memory component.
-      (Default = 8)
-    </description>
-  </property>
-  <property>
-    <name>plot.activate</name>
-    <value>false</value>
-    <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
-    </description>
-  </property>
-</asterixConfiguration>

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/8b53ce55/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-advanced.sqlpp
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-advanced.sqlpp b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-advanced.sqlpp
new file mode 100644
index 0000000..53399e1
--- /dev/null
+++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-advanced.sqlpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Check an advanced channel for index usage
+ * Expected Res : Success
+ * Date         : Oct 2017
+ */
+
+drop dataverse channels5 if exists;
+create dataverse channels5;
+use channels5;
+
+create type UserLocation as closed {
+  recordId: uuid,
+  location: circle,
+  userName: string,
+  timeStamp: datetime
+};
+
+create type EmergencyReport as closed {
+  reportId: uuid,
+  Etype: string,
+  location: circle,
+  timeStamp: datetime
+};
+
+create type EmergencyShelter as closed {
+  shelterName: string,
+  location: circle
+};
+
+create dataset UserLocations(UserLocation)
+primary key recordId autogenerated;
+
+create dataset EmergencyReports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index locTimes on UserLocations(timeStamp);
+create index repTimes on EmergencyReports(timeStamp);
+
+create function RecentEmergenciesNearUser(userName) {
+  (
+  SELECT r AS report
+  FROM
+  (select value r from EmergencyReports r where r.timeStamp > current_datetime() - day_time_duration("PT10S")) r,
+  (select value r from UserLocations l where l.timeStamp > current_datetime() - day_time_duration("PT10S")) l
+  where l.userName = userName
+  and spatial_intersect(r.location,l.location)
+  )
+};
+
+write output to nc1:"rttest/channel-advanced.sqlpp";
+
+create repetitive channel EmergencyChannel using RecentEmergenciesNearUser@1 period duration("PT10S");
\ No newline at end of file