You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2017/03/01 20:08:19 UTC

[1/2] asterixdb-bad git commit: Added Procedures to BAD

Repository: asterixdb-bad
Updated Branches:
  refs/heads/master 3dcf57c4d -> 79226b576


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
deleted file mode 100644
index 1bbe331..0000000
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.runtime;
-
-import java.util.EnumSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
-import org.apache.asterix.bad.ChannelJobService;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class RepetitiveChannelOperatorNodePushable extends ActiveSourceOperatorNodePushable {
-
-    private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorNodePushable.class.getName());
-
-    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
-    private final JobSpecification jobSpec;
-    private long duration;
-    private final HyracksConnection hcc;
-
-    public RepetitiveChannelOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId,
-            JobSpecification channeljobSpec, String duration, String strIP, int port) throws HyracksDataException {
-        super(ctx, runtimeId);
-        this.jobSpec = channeljobSpec;
-        this.duration = ChannelJobService.findPeriod(duration);
-        try {
-            hcc = new HyracksConnection(strIP, port);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-
-    @Override
-    protected void start() throws HyracksDataException, InterruptedException {
-        try {
-            scheduledExecutorService =
-                    ChannelJobService.startJob(jobSpec, EnumSet.noneOf(JobFlag.class), null, hcc, duration);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-        while (!scheduledExecutorService.isTerminated()) {
-
-        }
-
-    }
-
-    @Override
-    protected void abort() throws HyracksDataException, InterruptedException {
-        scheduledExecutorService.shutdown();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/resources/lang-extension/lang.txt
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 94b4c78..b433f5f 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -5,6 +5,8 @@ import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
 import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
 import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
 import org.apache.asterix.bad.lang.statement.CreateProcedureStatement;
+import org.apache.asterix.bad.lang.statement.ExecuteProcedureStatement;
+import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
 
 
 @merge
@@ -18,7 +20,7 @@ Statement SingleStatement() throws ParseException:
   (
     // merge area 2
     before:
-    after:    | stmt = ChannelSubscriptionStatement())
+    after:    | stmt = ChannelSubscriptionStatement() | stmt = ProcedureExecution())
   {
     // merge area 3
   }
@@ -60,6 +62,10 @@ Statement DropStatement() throws ParseException:
       {
         stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);	
       }
+              | "procedure" funcSig = FunctionSignature() ifExists = IfExists()
+      {
+        stmt = new ProcedureDropStatement(funcSig, ifExists);
+      }
       )
   {
     // merge area 3
@@ -74,13 +80,13 @@ CreateChannelStatement ChannelSpecification() throws ParseException:
   CreateChannelStatement ccs = null;
   String fqFunctionName = null;
   Expression period = null;
-  boolean distributed = false;
+  boolean distributed = true;
 }
 {
   (
     "repetitive" "channel"  nameComponents = QualifiedName()
     <USING> appliedFunction = FunctionSignature()
-    "period" period = FunctionCallExpr() ("distributed" { distributed = true; })?
+    "period" period = FunctionCallExpr() ("nondistributed" { distributed = false; })?
     {
       ccs = new CreateChannelStatement(nameComponents.first,
                                    nameComponents.second, appliedFunction, period, distributed);
@@ -91,37 +97,67 @@ CreateChannelStatement ChannelSpecification() throws ParseException:
     }
 }
 
-
 @new
 CreateProcedureStatement ProcedureSpecification() throws ParseException:
 {
-  Pair<Identifier,Identifier> nameComponents = null;
+  FunctionName fctName = null;
   FunctionSignature signature;
   List<VarIdentifier> paramList = new ArrayList<VarIdentifier>();
   String functionBody;
   Token beginPos;
   Token endPos;
-  Expression functionBodyExpr;
+  Statement functionBodyExpr;
+  Expression period = null;
 }
 {
-    "procedure" nameComponents = QualifiedName()
+     "procedure" fctName = FunctionName()
      paramList = ParameterList()
     <LEFTBRACE>
   {
      beginPos = token;
   }
-  functionBodyExpr = Expression() <RIGHTBRACE>
+  functionBodyExpr = SingleStatement() <RIGHTBRACE>
     {
       endPos = token;
       functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn);
-      signature = new FunctionSignature(nameComponents.first.toString(), nameComponents.second.toString(), paramList.size());
+      signature = new FunctionSignature(fctName.dataverse, fctName.function, paramList.size());
       removeCurrentScope();
-      return new CreateProcedureStatement(signature, paramList, functionBody);
     }
+  ("period" period = FunctionCallExpr())?
+  {
+  return new CreateProcedureStatement(signature, paramList, functionBody, period);
+  }
 }
 
-
-
+@new
+ExecuteProcedureStatement ProcedureExecution() throws ParseException:
+{
+  ExecuteProcedureStatement callExpr;
+  List<Expression> argList = new ArrayList<Expression>();
+  Expression tmp;
+  int arity = 0;
+  FunctionName funcName = null;
+  String hint = null;
+}
+{
+  "execute"
+  funcName = FunctionName()
+  <LEFTPAREN> (tmp = Expression()
+    {
+      argList.add(tmp);
+      arity ++;
+    }
+  (<COMMA> tmp = Expression()
+    {
+      argList.add(tmp);
+      arity++;
+    }
+  )*)? <RIGHTPAREN>
+    {
+      String fqFunctionName =  funcName.function;
+      return new ExecuteProcedureStatement(funcName.dataverse, fqFunctionName, arity);
+    }
+}
 
 @new
 CreateBrokerStatement BrokerSpecification() throws ParseException:

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.aql
new file mode 100644
index 0000000..be1fc86
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.aql
@@ -0,0 +1,27 @@
+/*
+* Description  : Simple Delete Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type myLocation as {
+  timeStamp: datetime,
+  roomNumber: int
+}
+
+
+create dataset UserLocations(myLocation)
+primary key timeStamp;
+
+insert into dataset UserLocations([
+  {"timeStamp":current-datetime(), "roomNumber":222}]
+);
+
+create procedure deleteAll() {
+delete $i from dataset UserLocations
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.2.query.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.2.query.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.2.query.aql
new file mode 100644
index 0000000..0a0c582
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.2.query.aql
@@ -0,0 +1,10 @@
+/*
+* Description  : Simple Delete Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+count(for $i in dataset UserLocations
+return $i);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.aql
new file mode 100644
index 0000000..e9adaa8
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description  : Simple Delete Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+execute deleteAll();
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.4.query.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.4.query.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.4.query.aql
new file mode 100644
index 0000000..0a0c582
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.4.query.aql
@@ -0,0 +1,10 @@
+/*
+* Description  : Simple Delete Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+count(for $i in dataset UserLocations
+return $i);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.1.ddl.aql
new file mode 100644
index 0000000..1110d94
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.1.ddl.aql
@@ -0,0 +1,25 @@
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type myLocation as {
+  timeStamp: datetime,
+  roomNumber: int
+}
+
+
+create dataset UserLocations(myLocation)
+primary key timeStamp;
+
+create procedure findMe() {
+  insert into dataset UserLocations([
+    {"timeStamp":current-datetime(), "roomNumber":222}]
+  )
+};

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.aql
new file mode 100644
index 0000000..ddc2790
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.aql
new file mode 100644
index 0000000..ddc2790
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.aql
new file mode 100644
index 0000000..ddc2790
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.5.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.5.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.5.ddl.aql
new file mode 100644
index 0000000..a229f81
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.5.ddl.aql
@@ -0,0 +1,9 @@
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : 3
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+drop procedure findMe@0;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.6.query.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.6.query.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.6.query.aql
new file mode 100644
index 0000000..df9cc77
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.6.query.aql
@@ -0,0 +1,10 @@
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+count(for $i in dataset UserLocations
+return $i);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.1.ddl.aql
new file mode 100644
index 0000000..eaebfbd
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.1.ddl.aql
@@ -0,0 +1,25 @@
+/*
+* Description  : Simple Query Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type myLocation as {
+  timeStamp: datetime,
+  roomNumber: int
+}
+
+
+create dataset UserLocations(myLocation)
+primary key timeStamp;
+
+create procedure findMe() {
+for $i in dataset UserLocations
+order by $i.timeStamp
+return $i.roomNumber
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.2.update.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.2.update.aql
new file mode 100644
index 0000000..2f0e968
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.2.update.aql
@@ -0,0 +1,12 @@
+/*
+* Description  : Simple Query Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+
+insert into dataset UserLocations([
+  {"timeStamp":current-datetime(), "roomNumber":222}]
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.3.update.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.3.update.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.3.update.aql
new file mode 100644
index 0000000..824d026
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.3.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description  : Simple Query Procedure
+* Expected Res : 222
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.4.update.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.4.update.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.4.update.aql
new file mode 100644
index 0000000..199d0fc
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.4.update.aql
@@ -0,0 +1,11 @@
+/*
+* Description  : Simple Query Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+insert into dataset UserLocations([
+  {"timeStamp":current-datetime(), "roomNumber":225}]
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.5.update.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.5.update.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.5.update.aql
new file mode 100644
index 0000000..25e0ba5
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.5.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description  : Simple Query Procedure
+* Expected Res : 222,225
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.1.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.1.ddl.aql
new file mode 100644
index 0000000..f4d45c9
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.1.ddl.aql
@@ -0,0 +1,25 @@
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type myLocation as {
+  timeStamp: datetime,
+  roomNumber: int
+}
+
+
+create dataset UserLocations(myLocation)
+primary key timeStamp;
+
+create procedure findMe() {
+  insert into dataset UserLocations([
+    {"timeStamp":current-datetime(), "roomNumber":222}]
+  )
+} period duration("PT5S");

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.2.update.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.2.update.aql
new file mode 100644
index 0000000..ddc2790
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.2.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.3.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.3.sleep.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.3.sleep.aql
new file mode 100644
index 0000000..8adb253
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.3.sleep.aql
@@ -0,0 +1,8 @@
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+11000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.4.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.4.ddl.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.4.ddl.aql
new file mode 100644
index 0000000..3b6e6c4
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.4.ddl.aql
@@ -0,0 +1,9 @@
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : Success
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+drop procedure findMe@0;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.query.aql
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.query.aql b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.query.aql
new file mode 100644
index 0000000..8a1872c
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.query.aql
@@ -0,0 +1,10 @@
+/*
+* Description  : Simple Insert Procedure
+* Expected Res : 3
+* Date         : Jan 2017
+* Author       : Steven Jacobs
+*/
+
+use dataverse channels;
+count(for $i in dataset UserLocations
+return $i);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.2.adm
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.2.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.2.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.2.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.4.adm
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.4.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.4.adm
new file mode 100644
index 0000000..c227083
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.4.adm
@@ -0,0 +1 @@
+0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/results/procedure/insert_procedure/insert_procedure.6.adm
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/insert_procedure/insert_procedure.6.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/insert_procedure/insert_procedure.6.adm
new file mode 100644
index 0000000..e440e5c
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/insert_procedure/insert_procedure.6.adm
@@ -0,0 +1 @@
+3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.3.adm
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.3.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.3.adm
new file mode 100644
index 0000000..6dd90d2
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.3.adm
@@ -0,0 +1 @@
+222
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.5.adm
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.5.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.5.adm
new file mode 100644
index 0000000..b1d4fa9
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.5.adm
@@ -0,0 +1,2 @@
+222
+225
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/results/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.adm
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.adm
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ b/asterix-bad/src/test/resources/runtimets/results/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.adm
@@ -0,0 +1 @@
+2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-bad/src/test/resources/runtimets/testsuite.xml b/asterix-bad/src/test/resources/runtimets/testsuite.xml
index 997dc77..d9ade1f 100644
--- a/asterix-bad/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-bad/src/test/resources/runtimets/testsuite.xml
@@ -8,6 +8,26 @@
             <output-dir compare="Text">room_occupants</output-dir>
         </compilation-unit>
     </test-case>
+    <test-case FilePath="procedure">
+        <compilation-unit name="insert_procedure">
+            <output-dir compare="Text">insert_procedure</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="procedure">
+        <compilation-unit name="delete_procedure">
+            <output-dir compare="Text">delete_procedure</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="procedure">
+        <compilation-unit name="query_procedure">
+            <output-dir compare="Text">query_procedure</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="procedure">
+        <compilation-unit name="repetitive_insert_procedure">
+            <output-dir compare="Text">repetitive_insert_procedure</output-dir>
+        </compilation-unit>
+    </test-case>
     <test-case FilePath="channel">
         <compilation-unit name="create_channel_check_datasets">
             <output-dir compare="Text">create_channel_check_datasets</output-dir>


[2/2] asterixdb-bad git commit: Added Procedures to BAD

Posted by sj...@apache.org.
Added Procedures to BAD

Change-Id: I03550a74e2c90179e72345103b3d2c4f98148631


Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/79226b57
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/79226b57
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/79226b57

Branch: refs/heads/master
Commit: 79226b5769ee0960ba0fd207315e4a25a17c1812
Parents: 3dcf57c
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Thu Feb 23 21:13:42 2017 -0800
Committer: Steven Glenn Jacobs <sj...@ucr.edu>
Committed: Thu Feb 23 21:13:42 2017 -0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 asterix-bad/pom.xml                             |   5 -
 .../org/apache/asterix/bad/ChannelJobInfo.java  |  48 ----
 .../apache/asterix/bad/ChannelJobService.java   |   2 +-
 .../asterix/bad/lang/BADLangExtension.java      |   7 +
 .../bad/lang/BADQueryTranslatorFactory.java     |   3 +-
 .../asterix/bad/lang/BADStatementExecutor.java  |  15 +-
 .../lang/statement/ChannelDropStatement.java    |  51 +----
 .../statement/ChannelUnsubscribeStatement.java  |   2 +-
 .../lang/statement/CreateChannelStatement.java  | 121 +++-------
 .../statement/CreateProcedureStatement.java     | 111 ++++++---
 .../statement/ExecuteProcedureStatement.java    | 143 ++++++++++++
 .../lang/statement/ProcedureDropStatement.java  | 131 +++++++++++
 .../bad/metadata/BADMetadataExtension.java      |   3 +-
 .../bad/metadata/BADMetadataRecordTypes.java    |   5 +-
 .../bad/metadata/ChannelEventsListener.java     | 229 -------------------
 .../metadata/DataverseProceduresSearchKey.java  |  43 ++++
 .../metadata/PrecompiledJobEventListener.java   | 127 ++++++++++
 .../apache/asterix/bad/metadata/Procedure.java  |   8 +-
 .../bad/metadata/ProcedureTupleTranslator.java  |  12 +-
 .../RepetitiveChannelOperatorDescriptor.java    |  78 -------
 .../RepetitiveChannelOperatorNodePushable.java  |  76 ------
 .../src/main/resources/lang-extension/lang.txt  |  60 ++++-
 .../delete_procedure/delete_procedure.1.ddl.aql |  27 +++
 .../delete_procedure.2.query.aql                |  10 +
 .../delete_procedure.3.update.aql               |   9 +
 .../delete_procedure.4.query.aql                |  10 +
 .../insert_procedure/insert_procedure.1.ddl.aql |  25 ++
 .../insert_procedure.2.update.aql               |   9 +
 .../insert_procedure.3.update.aql               |   9 +
 .../insert_procedure.4.update.aql               |   9 +
 .../insert_procedure/insert_procedure.5.ddl.aql |   9 +
 .../insert_procedure.6.query.aql                |  10 +
 .../query_procedure/query_procedure.1.ddl.aql   |  25 ++
 .../query_procedure.2.update.aql                |  12 +
 .../query_procedure.3.update.aql                |   9 +
 .../query_procedure.4.update.aql                |  11 +
 .../query_procedure.5.update.aql                |   9 +
 .../repetitive_insert_procedure.1.ddl.aql       |  25 ++
 .../repetitive_insert_procedure.2.update.aql    |   9 +
 .../repetitive_insert_procedure.3.sleep.aql     |   8 +
 .../repetitive_insert_procedure.4.ddl.aql       |   9 +
 .../repetitive_insert_procedure.5.query.aql     |  10 +
 .../delete_procedure/delete_procedure.2.adm     |   1 +
 .../delete_procedure/delete_procedure.4.adm     |   1 +
 .../insert_procedure/insert_procedure.6.adm     |   1 +
 .../query_procedure/query_procedure.3.adm       |   1 +
 .../query_procedure/query_procedure.5.adm       |   2 +
 .../repetitive_insert_procedure.5.adm           |   1 +
 .../src/test/resources/runtimets/testsuite.xml  |  20 ++
 50 files changed, 946 insertions(+), 616 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 00a7ae5..76ad872 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,3 +9,4 @@ git.properties
 build
 *.iml
 .idea/*
+ClusterControllerService

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index 9065680..0d32652 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -247,11 +247,6 @@
       <version>${hyracks.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-dataflow-std</artifactId>
-      <version>${hyracks.version}</version>
-    </dependency>
-    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>4.12</version>

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
deleted file mode 100644
index da0c43b..0000000
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad;
-
-import java.util.List;
-
-import org.apache.asterix.active.ActiveJob;
-import org.apache.asterix.active.ActivityState;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.bad.BADConstants.ChannelJobType;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class ChannelJobInfo extends ActiveJob {
-
-    private static final long serialVersionUID = 1L;
-    private List<String> locations;
-
-    public ChannelJobInfo(EntityId entityId, JobId jobId, ActivityState state, JobSpecification spec) {
-        super(entityId, jobId, state, ChannelJobType.REPETITIVE, spec);
-    }
-
-    public List<String> getLocations() {
-        return locations;
-
-    }
-
-    public void setLocations(List<String> locations) {
-        this.locations = locations;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
index d1df438..eba8ca1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -70,7 +70,7 @@ public class ChannelJobService {
         if (jobId == null) {
             hcc.startJob(jobSpec, jobFlags);
         } else {
-            hcc.startJob(jobSpec, jobFlags, jobId);
+            hcc.startJob(jobId);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
index 959600f..1d46bc4 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
@@ -27,6 +27,7 @@ import org.apache.asterix.bad.metadata.Channel;
 import org.apache.asterix.bad.metadata.ChannelSearchKey;
 import org.apache.asterix.bad.metadata.DataverseBrokersSearchKey;
 import org.apache.asterix.bad.metadata.DataverseChannelsSearchKey;
+import org.apache.asterix.bad.metadata.DataverseProceduresSearchKey;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.bad.metadata.ProcedureSearchKey;
 import org.apache.asterix.common.api.ExtensionId;
@@ -119,4 +120,10 @@ public class BADLangExtension implements ILangExtension {
         return MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey);
     }
 
+    public static List<Procedure> getProcedures(MetadataTransactionContext mdTxnCtx, String dataverseName)
+            throws AlgebricksException {
+        DataverseProceduresSearchKey proceduresSearchKey = new DataverseProceduresSearchKey(dataverseName);
+        return MetadataManager.INSTANCE.getEntities(mdTxnCtx, proceduresSearchKey);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
index 8960c70..2a11e13 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -32,6 +32,7 @@ public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory {
     @Override
     public QueryTranslator create(List<Statement> statements, SessionConfig conf,
             ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) {
-        return new BADStatementExecutor(statements, conf, compilationProvider, storageComponentProvider);
+        return new BADStatementExecutor(statements, conf, compilationProvider, storageComponentProvider,
+                executorService);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 7e73d86..9be02f6 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -19,13 +19,17 @@
 package org.apache.asterix.bad.lang;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
 import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
+import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
 import org.apache.asterix.bad.metadata.Broker;
 import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
@@ -39,8 +43,9 @@ import org.apache.hyracks.api.client.IHyracksClientConnection;
 public class BADStatementExecutor extends QueryTranslator {
 
     public BADStatementExecutor(List<Statement> aqlStatements, SessionConfig conf,
-            ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider) {
-        super(aqlStatements, conf, compliationProvider, storageComponentProvider);
+            ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider,
+            ExecutorService executorService) {
+        super(aqlStatements, conf, compliationProvider, storageComponentProvider, executorService);
     }
 
 
@@ -64,6 +69,12 @@ public class BADStatementExecutor extends QueryTranslator {
                     new Identifier(channel.getChannelId().getEntityName()), false);
             drop.handle(this, metadataProvider, hcc, null, null, null, 0);
         }
+        List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, dvId.getValue());
+        for (Procedure procedure : procedures) {
+            ProcedureDropStatement drop = new ProcedureDropStatement(new FunctionSignature(dvId.getValue(),
+                    procedure.getEntityId().getEntityName(), procedure.getArity()), false);
+            drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+        }
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index b41d4a1..89b0e9a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -18,42 +18,29 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobInfo;
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.bad.metadata.ChannelEventsListener;
-import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorNodePushable;
-import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
 import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
-import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
 
 public class ChannelDropStatement implements IExtensionStatement {
 
@@ -102,10 +89,8 @@ public class ChannelDropStatement implements IExtensionStatement {
         String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
-        ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
+        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
                 .getActiveEntityListener(entityId);
-        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
-        boolean subscriberRegistered = false;
         Channel channel = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -122,29 +107,14 @@ public class ChannelDropStatement implements IExtensionStatement {
                     throw new AlgebricksException("There is no channel with this name " + channelName + ".");
                 }
             }
-            if (listener != null) {
-                subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
-            }
-            if (!subscriberRegistered) {
-                throw new AsterixException("Channel " + channelName + " is not running");
-            }
 
-            ICCMessageBroker messageBroker =
-                    (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
-
-            ChannelJobInfo cInfo = listener.getJobInfo(channel.getChannelId());;
-            Set<String> ncs = new HashSet<>(cInfo.getLocations());
-            AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(
-                    ncs.toArray(new String[ncs.size()]));
-            int partition = 0;
-            for (String location : locations.getLocations()) {
-                messageBroker.sendApplicationMessageToNC(
-                        new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "cc",
-                                new ActiveRuntimeId(channel.getChannelId(),
-                                        RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition++)),
-                        location);
+            listener.getExecutorService().shutdownNow();
+            JobId hyracksJobId = listener.getJobId();
+            listener.deActivate();
+            ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
+            if (hyracksJobId != null) {
+                hcc.destroyJob(hyracksJobId);
             }
-            eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_ENDED);
 
             //Drop the Channel Datasets
             //TODO: Need to find some way to handle if this fails.
@@ -157,9 +127,6 @@ public class ChannelDropStatement implements IExtensionStatement {
                     new Identifier(channel.getSubscriptionsDataset()), true);
             ((QueryTranslator) statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
 
-            if (subscriberRegistered) {
-                listener.deregisterEventSubscriber(eventSubscriber);
-            }
 
             //Remove the Channel Metadata
             MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index fa57503..1558508 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -156,7 +156,7 @@ public class ChannelUnsubscribeStatement implements IExtensionStatement {
             AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
             delete.accept(visitor, null);
 
-            ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc);
+            ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc, false);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             QueryTranslator.abort(e, e, mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 1ea8e7f..362c8bf 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -22,35 +22,28 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.StringReader;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobInfo;
 import org.apache.asterix.bad.ChannelJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.bad.metadata.ChannelEventsListener;
-import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.utils.JobUtils;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
-import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.lang.aql.parser.AQLParserFactory;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
@@ -69,21 +62,11 @@ import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.om.base.temporal.ADurationParserFactory;
-import org.apache.asterix.runtime.utils.AppContextInfo;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
-import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.application.ICCApplicationContext;
-import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
@@ -183,36 +166,6 @@ public class CreateChannelStatement implements IExtensionStatement {
         return Kind.EXTENSION;
     }
 
-    public Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildChannelJobSpec(String dataverse,
-            String channelName, String duration, MetadataProvider metadataProvider, JobSpecification channeljobSpec,
-            String strIP, int port) throws Exception {
-        JobSpecification spec = RuntimeUtils.createJobSpecification();
-        IOperatorDescriptor channelQueryExecuter;
-        AlgebricksPartitionConstraint executerPc;
-
-        Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> p = buildChannelRuntime(spec, dataverse,
-                channelName, duration, channeljobSpec, strIP, port);
-        channelQueryExecuter = p.first;
-        executerPc = p.second;
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, channelQueryExecuter, executerPc);
-        spec.addRoot(channelQueryExecuter);
-        return new Pair<>(spec, p.second);
-
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> buildChannelRuntime(
-            JobSpecification jobSpec, String dataverse, String channelName, String duration,
-            JobSpecification channeljobSpec, String strIP, int port) throws Exception {
-        RepetitiveChannelOperatorDescriptor channelOp = new RepetitiveChannelOperatorDescriptor(jobSpec, dataverse,
-                channelName, duration, channeljobSpec, strIP, port);
-
-        String partition = ClusterStateManager.INSTANCE.getClusterLocations().getLocations()[0];
-        Set<String> ncs = new HashSet<>(Arrays.asList(partition));
-        AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
-                ncs.toArray(new String[ncs.size()]));
-        return new Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint>(channelOp, partitionConstraint);
-    }
-
     private void createDatasets(IStatementExecutor statementExecutor, Identifier subscriptionsName,
             Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
             IHyracksDataset hdc, Stats stats, String dataverse) throws AsterixException, Exception {
@@ -281,28 +234,20 @@ public class CreateChannelStatement implements IExtensionStatement {
                 hcc, hdc, ResultDelivery.ASYNC, stats, true);
     }
 
-    private void setupCompiledJob(MetadataProvider metadataProvider, String dataverse, EntityId entityId,
-            JobSpecification channeljobSpec, IHyracksClientConnection hcc) throws Exception {
-        ICCApplicationContext iCCApp = AppContextInfo.INSTANCE.getCCApplicationContext();
-        ClusterControllerInfo ccInfo = iCCApp.getCCContext().getClusterControllerInfo();
-        String strIP = ccInfo.getClientNetAddress();
-        int port = ccInfo.getClientNetPort();
-        //Create Channel Operator
-        Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> alteredJobSpec = buildChannelJobSpec(dataverse,
-                channelName.getValue(), duration, metadataProvider, channeljobSpec, strIP, port);
-
-        ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE, alteredJobSpec.first);
-        alteredJobSpec.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
-        JobUtils.runJob(hcc, alteredJobSpec.first, false);
-    }
-
-    private void setupDistributedJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc)
+    private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
+            PrecompiledJobEventListener listener, boolean predistributed)
             throws Exception {
-        ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE, channeljobSpec);
-        channeljobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
-        JobId jobId = hcc.startJob(channeljobSpec, EnumSet.of(JobFlag.STORE_JOB));
-        ChannelJobService.startJob(channeljobSpec, EnumSet.of(JobFlag.STORE_JOB), jobId, hcc,
-                ChannelJobService.findPeriod(duration));
+        if (channeljobSpec != null) {
+            //TODO: Find a way to fix optimizer tests so we don't need this check
+            JobId jobId = null;
+            if (predistributed) {
+                jobId = hcc.distributeJob(channeljobSpec);
+            }
+            ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
+                    jobId, hcc, ChannelJobService.findPeriod(duration));
+            listener.storeDistributedInfo(jobId, ses, null, null);
+        }
+
     }
 
     @Override
@@ -324,10 +269,9 @@ public class CreateChannelStatement implements IExtensionStatement {
         Identifier subscriptionsName = new Identifier(channelName + BADConstants.subscriptionEnding);
         Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
         EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
-        ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
+        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
                 .getActiveEntityListener(entityId);
-        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
-        boolean subscriberRegistered = false;
+        boolean alreadyActive = false;
         Channel channel = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -339,9 +283,9 @@ public class CreateChannelStatement implements IExtensionStatement {
                 throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
             }
             if (listener != null) {
-                subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+                alreadyActive = listener.isEntityActive();
             }
-            if (subscriberRegistered) {
+            if (alreadyActive) {
                 throw new AsterixException("Channel " + channelName + " is already running");
             }
             initialize(mdTxnCtx, subscriptionsName.getValue(), resultsName.getValue());
@@ -356,14 +300,6 @@ public class CreateChannelStatement implements IExtensionStatement {
                 throw new AsterixException("The channel name:" + channelName + " is not available.");
             }
 
-            // Now we subscribe
-            if (listener == null) {
-                listener = new ChannelEventsListener(entityId);
-                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
-            }
-            listener.registerEventSubscriber(eventSubscriber);
-            subscriberRegistered = true;
-
             //Create Channel Datasets
             createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats,
                     dataverse);
@@ -372,12 +308,21 @@ public class CreateChannelStatement implements IExtensionStatement {
             JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
                     metadataProvider, hcc, hdc, stats, dataverse);
 
+            // Now we subscribe
+            if (listener == null) {
+                List<IDataset> datasets = new ArrayList<>();
+                datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()));
+                datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()));
+                //TODO: Add datasets used by channel function
+                listener = new PrecompiledJobEventListener(entityId, PrecompiledType.CHANNEL, datasets);
+                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+            }
+
             if (distributed) {
-                setupDistributedJob(entityId, channeljobSpec, hcc);
+                setupExecutorJob(entityId, channeljobSpec, hcc, listener, true);
             } else {
-                setupCompiledJob(metadataProvider, dataverse, entityId, channeljobSpec, hcc);
+                setupExecutorJob(entityId, channeljobSpec, hcc, listener, false);
             }
-            eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index 66dd2ae..71e11dc 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
@@ -27,34 +29,45 @@ import java.util.logging.Logger;
 import org.apache.asterix.active.ActiveJobNotificationHandler;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
-import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
 import org.apache.asterix.lang.aql.parser.AQLParserFactory;
+import org.apache.asterix.lang.aql.visitor.AqlDeleteRewriteVisitor;
+import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.om.base.temporal.ADurationParserFactory;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
 
 public class CreateProcedureStatement implements IExtensionStatement {
 
@@ -63,6 +76,8 @@ public class CreateProcedureStatement implements IExtensionStatement {
     private final FunctionSignature signature;
     private final String functionBody;
     private final List<String> paramList;
+    private final CallExpr period;
+    private String duration = "";
 
     public FunctionSignature getaAterixFunction() {
         return signature;
@@ -72,14 +87,15 @@ public class CreateProcedureStatement implements IExtensionStatement {
         return functionBody;
     }
 
-    public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList,
-            String functionBody) {
+    public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList, String functionBody,
+            Expression period) {
         this.signature = signature;
         this.functionBody = functionBody;
         this.paramList = new ArrayList<String>();
         for (VarIdentifier varId : parameterList) {
             this.paramList.add(varId.getValue());
         }
+        this.period = (CallExpr) period;
     }
 
     @Override
@@ -100,12 +116,32 @@ public class CreateProcedureStatement implements IExtensionStatement {
         return Category.DDL;
     }
 
+    public Expression getPeriod() {
+        return period;
+    }
+
     @Override
     public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
         return null;
     }
 
-    private JobSpecification createProcedureJob(String body, IStatementExecutor statementExecutor,
+    private void initialize() throws MetadataException, HyracksDataException {
+        if (period == null) {
+            return;
+        }
+        if (!period.getFunctionSignature().getName().equals("duration")) {
+            throw new MetadataException(
+                    "Expected argument period as a duration, but got " + period.getFunctionSignature().getName() + ".");
+        }
+        duration = ((StringLiteral) ((LiteralExpr) period.getExprList().get(0)).getValue()).getValue();
+        IValueParser durationParser = ADurationParserFactory.INSTANCE.createValueParser();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream outputStream = new DataOutputStream(bos);
+        durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
+    }
+
+    private Pair<JobSpecification, PrecompiledType> createProcedureJob(String body,
+            IStatementExecutor statementExecutor,
             MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats)
                     throws Exception {
         StringBuilder builder = new StringBuilder();
@@ -114,10 +150,29 @@ public class CreateProcedureStatement implements IExtensionStatement {
         AQLParserFactory aqlFact = new AQLParserFactory();
         List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
         if (fStatements.size() > 1) {
-            throw new Exception("Procedure can only execute a single statement");
+            throw new CompilationException("Procedure can only execute a single statement");
+        }
+        if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
+            return new Pair<>(((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+                    fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, stats, true), PrecompiledType.INSERT);
+        } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
+            return new Pair<>(((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider,
+                    (Query) fStatements.get(0), null), PrecompiledType.QUERY);
+        } else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
+            AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
+            fStatements.get(0).accept(visitor, null);
+            return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
+                    fStatements.get(0), hcc, true), PrecompiledType.DELETE);
+        }else{
+            throw new CompilationException("Procedure can only execute a single delete, insert, or query");
         }
-        return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0),
-                hcc, hdc, ResultDelivery.ASYNC, stats, true);
+    }
+
+    private void setupDistributedJob(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc,
+            PrecompiledJobEventListener listener, MetadataProvider metadataProvider, IHyracksDataset hdc, Stats stats)
+                    throws Exception {
+        JobId jobId = hcc.distributeJob(jobSpec);
+        listener.storeDistributedInfo(jobId, null, new ResultReader(hdc), metadataProvider.getResultSetId());
     }
 
     @Override
@@ -125,14 +180,15 @@ public class CreateProcedureStatement implements IExtensionStatement {
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
             int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
 
+        initialize();
+
         String dataverse =
                 ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
 
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
-        ChannelEventsListener listener =
-                (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
-        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
-        boolean subscriberRegistered = false;
+        PrecompiledJobEventListener listener =
+                (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+        boolean alreadyActive = false;
         Procedure procedure = null;
 
         MetadataTransactionContext mdTxnCtx = null;
@@ -145,31 +201,30 @@ public class CreateProcedureStatement implements IExtensionStatement {
                 throw new AlgebricksException("A procedure with this name " + signature.getName() + " already exists.");
             }
             if (listener != null) {
-                subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+                alreadyActive = listener.isEntityActive();
             }
-            if (subscriberRegistered) {
+            if (alreadyActive) {
                 throw new AsterixException("Procedure " + signature.getName() + " is already running");
             }
 
             procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(),
-                    Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL);
-
-            // Now we subscribe
-            if (listener == null) {
-                listener = new ChannelEventsListener(entityId);
-                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
-            }
-            listener.registerEventSubscriber(eventSubscriber);
-            subscriberRegistered = true;
+                    Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL, duration);
 
+            metadataProvider.setResultSetId(new ResultSetId(0));
+            metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
 
             //Create Procedure Internal Job
-            JobSpecification channeljobSpec =
+            Pair<JobSpecification, PrecompiledType> procedureJobSpec =
                     createProcedureJob(getFunctionBody(), statementExecutor, metadataProvider, hcc, hdc, stats);
 
-            // setupDistributedJob(entityId, channeljobSpec, hcc);
+            // Now we subscribe
+            if (listener == null) {
+                //TODO: Add datasets used by channel function
+                listener = new PrecompiledJobEventListener(entityId, procedureJobSpec.second, new ArrayList<>());
+                ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+            }
 
-            eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+            setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, metadataProvider, hdc, stats);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
new file mode 100644
index 0000000..f333cba
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.EnumSet;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.app.result.ResultUtil;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
+import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+
+public class ExecuteProcedureStatement implements IExtensionStatement {
+
+    private final String dataverseName;
+    private final String procedureName;
+    private final int arity;
+
+    public ExecuteProcedureStatement(String dataverseName, String procedureName, int arity) {
+        this.dataverseName = dataverseName;
+        this.procedureName = procedureName;
+        this.arity = arity;
+    }
+
+    public String getDataverseName() {
+        return dataverseName;
+    }
+
+    public String getProcedureName() {
+        return procedureName;
+    }
+
+    public int getArity() {
+        return arity;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.UPDATE;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+
+
+        String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(dataverseName));
+        boolean txnActive = false;
+        EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
+        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
+                .getActiveEntityListener(entityId);
+        Procedure procedure = null;
+
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            txnActive = true;
+            procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, procedureName,
+                    Integer.toString(getArity()));
+            if (procedure == null) {
+                throw new AlgebricksException("There is no procedure with this name " + procedureName + ".");
+            }
+
+            JobId hyracksJobId = listener.getJobId();
+            if (procedure.getDuration().equals("")) {
+                hcc.startJob(hyracksJobId);
+
+                if (listener.getType() == PrecompiledType.QUERY) {
+                    hcc.waitForCompletion(hyracksJobId);
+                    ResultReader resultReader = listener.getResultReader();
+                    resultReader.open(hyracksJobId, listener.getResultSetId());
+                    ResultUtil.printResults(resultReader, ((QueryTranslator) statementExecutor).getSessionConfig(),
+                            new Stats(), null);
+                }
+
+            } else {
+                ScheduledExecutorService ses = ChannelJobService.startJob(null, EnumSet.noneOf(JobFlag.class),
+                        hyracksJobId, hcc, ChannelJobService.findPeriod(procedure.getDuration()));
+                listener.storeDistributedInfo(hyracksJobId, ses, listener.getResultReader(), listener.getResultSetId());
+            }
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            txnActive = false;
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (txnActive) {
+                QueryTranslator.abort(e, e, mdTxnCtx);
+            }
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
new file mode 100644
index 0000000..9fe8a83
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+
+public class ProcedureDropStatement implements IExtensionStatement {
+
+    private final FunctionSignature signature;
+    private boolean ifExists;
+
+    public ProcedureDropStatement(FunctionSignature signature, boolean ifExists) {
+        this.signature = signature;
+        this.ifExists = ifExists;
+    }
+
+    public FunctionSignature getFunctionSignature() {
+        return signature;
+    }
+
+    public boolean getIfExists() {
+        return ifExists;
+    }
+
+    @Override
+    public byte getKind() {
+        return Kind.EXTENSION;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+        return null;
+    }
+
+    @Override
+    public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
+        FunctionSignature signature = getFunctionSignature();
+        String dataverse =
+                ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
+        signature.setNamespace(dataverse);
+
+        boolean txnActive = false;
+        EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
+        PrecompiledJobEventListener listener = (PrecompiledJobEventListener) ActiveJobNotificationHandler.INSTANCE
+                .getActiveEntityListener(entityId);
+        Procedure procedure = null;
+
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            txnActive = true;
+            procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(),
+                    Integer.toString(signature.getArity()));
+            txnActive = false;
+            if (procedure == null) {
+                if (ifExists) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return;
+                } else {
+                    throw new AlgebricksException("There is no procedure with this name " + signature.getName() + ".");
+                }
+            }
+
+            if (listener.getExecutorService() != null) {
+                listener.getExecutorService().shutdownNow();
+            }
+            JobId hyracksJobId = listener.getJobId();
+            listener.deActivate();
+            ActiveJobNotificationHandler.INSTANCE.removeListener(listener);
+            if (hyracksJobId != null) {
+                hcc.destroyJob(hyracksJobId);
+            }
+
+            //Remove the Channel Metadata
+            MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, procedure);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (txnActive) {
+                QueryTranslator.abort(e, e, mdTxnCtx);
+            }
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index 5ba303c..1a8eda1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -80,7 +80,8 @@ public class BADMetadataExtension implements IMetadataExtension {
     @Override
     public List<ExtensionMetadataDataset> getExtensionIndexes() {
         try {
-            return Arrays.asList(BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET);
+            return Arrays.asList(BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET,
+                    BADMetadataIndexes.PROCEDURE_DATASET);
         } catch (Throwable th) {
             th.printStackTrace();
             throw th;

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index 6ee5735..0430118 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -83,17 +83,18 @@ public class BADMetadataRecordTypes {
     public static final int PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX = 4;
     public static final int PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX = 5;
     public static final int PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX = 6;
+    public static final int PROCEDURE_ARECORD_PROCEDURE_DURATION_FIELD_INDEX = 7;
     public static final ARecordType PROCEDURE_RECORDTYPE = MetadataRecordTypes.createRecordType(
             // RecordTypeName
             BADConstants.RECORD_TYPENAME_PROCEDURE,
             // FieldNames
             new String[] { BADConstants.DataverseName, BADConstants.ProcedureName, BADConstants.FIELD_NAME_ARITY,
                     BADConstants.FIELD_NAME_PARAMS, BADConstants.FIELD_NAME_RETURN_TYPE,
-                    BADConstants.FIELD_NAME_DEFINITION, BADConstants.FIELD_NAME_LANGUAGE },
+                    BADConstants.FIELD_NAME_DEFINITION, BADConstants.FIELD_NAME_LANGUAGE, BADConstants.Duration },
             // FieldTypes
             new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
                     new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING, BuiltinType.ASTRING,
-                    BuiltinType.ASTRING },
+                    BuiltinType.ASTRING, BuiltinType.ASTRING },
             //IsOpen?
             true);
 

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
deleted file mode 100644
index 1c812a0..0000000
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.metadata;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.active.ActiveEvent;
-import org.apache.asterix.active.ActiveJob;
-import org.apache.asterix.active.ActivityState;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobInfo;
-import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.runtime.utils.AppContextInfo;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.api.job.JobStatus;
-import org.apache.log4j.Logger;
-
-public class ChannelEventsListener implements IActiveEntityEventsListener {
-    private static final Logger LOGGER = Logger.getLogger(ChannelEventsListener.class);
-    private final List<IActiveLifecycleEventSubscriber> subscribers;
-    private final Map<Long, ActiveJob> jobs;
-    private final Map<EntityId, ChannelJobInfo> jobInfos;
-    private EntityId entityId;
-
-    public ChannelEventsListener(EntityId entityId) {
-        this.entityId = entityId;
-        subscribers = new ArrayList<>();
-        jobs = new HashMap<>();
-        jobInfos = new HashMap<>();
-    }
-
-    @Override
-    public void notify(ActiveEvent event) {
-        try {
-            switch (event.getEventKind()) {
-                case JOB_START:
-                    handleJobStartEvent(event);
-                    break;
-                case JOB_FINISH:
-                    handleJobFinishEvent(event);
-                    break;
-                case PARTITION_EVENT:
-                    LOGGER.warn("Partition Channel Event");
-                    break;
-                default:
-                    break;
-
-            }
-        } catch (Exception e) {
-            LOGGER.error("Unhandled Exception", e);
-        }
-    }
-
-    private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
-        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
-        handleJobStartMessage((ChannelJobInfo) jobInfo);
-    }
-
-    private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
-        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Channel Job finished for  " + jobInfo);
-        }
-        handleJobFinishMessage((ChannelJobInfo) jobInfo);
-    }
-
-    private synchronized void handleJobFinishMessage(ChannelJobInfo cInfo) throws Exception {
-        EntityId channelJobId = cInfo.getEntityId();
-
-        IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
-        JobInfo info = hcc.getJobInfo(cInfo.getJobId());
-        JobStatus status = info.getStatus();
-        boolean failure = status != null && status.equals(JobStatus.FAILURE);
-
-        jobInfos.remove(channelJobId);
-        jobs.remove(cInfo.getJobId().getId());
-        // notify event listeners
-        ActiveLifecycleEvent event = failure ? ActiveLifecycleEvent.ACTIVE_JOB_FAILED
-                : ActiveLifecycleEvent.ACTIVE_JOB_ENDED;
-        notifyEventSubscribers(event);
-    }
-
-    private void notifyEventSubscribers(ActiveLifecycleEvent event) {
-        if (subscribers != null && !subscribers.isEmpty()) {
-            for (IActiveLifecycleEventSubscriber subscriber : subscribers) {
-                subscriber.handleEvent(event);
-            }
-        }
-    }
-
-    private static synchronized void handleJobStartMessage(ChannelJobInfo cInfo) throws Exception {
-        List<OperatorDescriptorId> channelOperatorIds = new ArrayList<>();
-        Map<OperatorDescriptorId, IOperatorDescriptor> operators = cInfo.getSpec().getOperatorMap();
-        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
-            IOperatorDescriptor opDesc = entry.getValue();
-            if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
-                channelOperatorIds.add(opDesc.getOperatorId());
-            }
-        }
-
-        IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
-        JobInfo info = hcc.getJobInfo(cInfo.getJobId());
-        List<String> locations = new ArrayList<>();
-        for (OperatorDescriptorId channelOperatorId : channelOperatorIds) {
-            Map<Integer, String> operatorLocations = info.getOperatorLocations().get(channelOperatorId);
-            int nOperatorInstances = operatorLocations.size();
-            for (int i = 0; i < nOperatorInstances; i++) {
-                locations.add(operatorLocations.get(i));
-            }
-        }
-        cInfo.setLocations(locations);
-        cInfo.setState(ActivityState.ACTIVE);
-    }
-
-    @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification spec) {
-        try {
-            registerJob(jobId, spec);
-            return;
-
-        } catch (Exception e) {
-            LOGGER.error(e);
-        }
-    }
-
-    public synchronized void registerJob(JobId jobId, JobSpecification jobSpec) {
-        if (jobs.get(jobId.getId()) != null) {
-            throw new IllegalStateException("Channel job already registered");
-        }
-        if (jobInfos.containsKey(jobId.getId())) {
-            throw new IllegalStateException("Channel job already registered");
-        }
-
-        ChannelJobInfo cInfo = new ChannelJobInfo(entityId, jobId, ActivityState.CREATED, jobSpec);
-        jobs.put(jobId.getId(), cInfo);
-        jobInfos.put(entityId, cInfo);
-
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Registered channel job [" + jobId + "]" + " for channel " + entityId);
-        }
-
-        notifyEventSubscribers(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
-
-    }
-
-    public JobSpecification getJobSpecification(EntityId activeJobId) {
-        return jobInfos.get(activeJobId).getSpec();
-    }
-
-    public ChannelJobInfo getJobInfo(EntityId activeJobId) {
-        return jobInfos.get(activeJobId);
-    }
-
-    public synchronized void registerEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
-        subscribers.add(subscriber);
-    }
-
-    public void deregisterEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
-        subscribers.remove(subscriber);
-    }
-
-    public synchronized boolean isChannelActive(EntityId activeJobId, IActiveLifecycleEventSubscriber eventSubscriber) {
-        boolean active = false;
-        ChannelJobInfo cInfo = jobInfos.get(activeJobId);
-        if (cInfo != null) {
-            active = cInfo.getState().equals(ActivityState.ACTIVE);
-        }
-        if (active) {
-            registerEventSubscriber(eventSubscriber);
-        }
-        return active;
-    }
-
-    public FeedConnectionId[] getConnections() {
-        return jobInfos.keySet().toArray(new FeedConnectionId[jobInfos.size()]);
-    }
-
-    @Override
-    public boolean isEntityActive() {
-        return !jobs.isEmpty();
-    }
-
-    @Override
-    public EntityId getEntityId() {
-        return entityId;
-    }
-
-    @Override
-    public boolean isEntityUsingDataset(String dataverseName, String datasetName) {
-        if (entityId.getDataverse().equals(dataverseName)) {
-            String subscriptionsName = entityId.getEntityName() + BADConstants.subscriptionEnding;
-            String resultsName = entityId.getEntityName() + BADConstants.resultsEnding;
-            if (datasetName.equals(subscriptionsName) || datasetName.equals(resultsName)) {
-                return true;
-            }
-        }
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java
new file mode 100644
index 0000000..9699e21
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class DataverseProceduresSearchKey implements IExtensionMetadataSearchKey {
+    private static final long serialVersionUID = 1L;
+    private final String dataverse;
+
+    public DataverseProceduresSearchKey(String dataverse) {
+        this.dataverse = dataverse;
+    }
+
+    @Override
+    public ExtensionMetadataDatasetId getDatasetId() {
+        return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID;
+    }
+
+    @Override
+    public ITupleReference getSearchKey() {
+        return MetadataNode.createTuple(dataverse);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
new file mode 100644
index 0000000..7c22dc5
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.log4j.Logger;
+
+public class PrecompiledJobEventListener extends ActiveEntityEventsListener {
+    private static final Logger LOGGER = Logger.getLogger(PrecompiledJobEventListener.class);
+
+    private ScheduledExecutorService executorService = null;
+    private ResultReader resultReader;
+    private ResultSetId resultSetId;
+
+    public enum PrecompiledType {
+        CHANNEL,
+        QUERY,
+        INSERT,
+        DELETE
+    }
+
+    private final PrecompiledType type;
+
+    public PrecompiledJobEventListener(EntityId entityId, PrecompiledType type, List<IDataset> datasets) {
+        this.entityId = entityId;
+        this.datasets = datasets;
+        state = ActivityState.STOPPED;
+        this.type = type;
+    }
+
+    public ResultReader getResultReader() {
+        return resultReader;
+    }
+
+    public ResultSetId getResultSetId() {
+        return resultSetId;
+    }
+
+    public PrecompiledType getType() {
+        return type;
+    }
+
+    public void storeDistributedInfo(JobId jobId, ScheduledExecutorService ses, ResultReader resultReader,
+            ResultSetId resultSetId) {
+        this.jobId = jobId;
+        this.executorService = ses;
+        this.resultReader = resultReader;
+        this.resultSetId = resultSetId;
+    }
+
+    public ScheduledExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public boolean isEntityActive() {
+        return state == ActivityState.STARTED;
+    }
+
+    public void deActivate() {
+        state = ActivityState.STOPPED;
+    }
+
+    @Override
+    public void notify(ActiveEvent event) {
+        try {
+            switch (event.getEventKind()) {
+                case JOB_STARTED:
+                    handleJobStartEvent(event);
+                    break;
+                case JOB_FINISHED:
+                    handleJobFinishEvent(event);
+                    break;
+                default:
+                    break;
+
+            }
+        } catch (Exception e) {
+            LOGGER.error("Unhandled Exception", e);
+        }
+    }
+
+    private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Channel Job started for  " + entityId);
+        }
+        state = ActivityState.STARTED;
+    }
+
+    private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Channel Job finished for  " + entityId);
+        }
+    }
+
+    @Override
+    public IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
index b64bf1b..a77a14d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
@@ -39,15 +39,17 @@ public class Procedure implements IExtensionMetadataEntity {
     private final String body;
     private final String returnType;
     private final String language;
+    private final String duration;
 
     public Procedure(String dataverseName, String functionName, int arity, List<String> params, String returnType,
-            String functionBody, String language) {
+            String functionBody, String language, String duration) {
         this.procedureId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, functionName);
         this.params = params;
         this.body = functionBody;
         this.returnType = returnType == null ? RETURNTYPE_VOID : returnType;
         this.language = language;
         this.arity = arity;
+        this.duration = duration;
     }
 
     public EntityId getEntityId() {
@@ -74,6 +76,10 @@ public class Procedure implements IExtensionMetadataEntity {
         return arity;
     }
 
+    public String getDuration() {
+        return duration;
+    }
+
     @Override
     public boolean equals(Object other) {
         if (this == other) {

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
index f2eab9b..e151aea 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
@@ -104,8 +104,12 @@ public class ProcedureTupleTranslator extends AbstractTupleTranslator<Procedure>
                 .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX))
                         .getStringValue();
 
+        String duration = ((AString) procedureRecord
+                .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DURATION_FIELD_INDEX))
+                        .getStringValue();
+
         return new Procedure(dataverseName, procedureName, Integer.parseInt(arity), params, returnType, definition,
-                language);
+                language, duration);
 
     }
 
@@ -178,6 +182,12 @@ public class ProcedureTupleTranslator extends AbstractTupleTranslator<Procedure>
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX, fieldValue);
 
+        // write field 7
+        fieldValue.reset();
+        aString.setValue(procedure.getDuration());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DURATION_FIELD_INDEX, fieldValue);
+
         // write record
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
         tupleBuilder.addFieldEndOffset();

http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/79226b57/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
deleted file mode 100644
index 8093977..0000000
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.asterix.bad.runtime;
-
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.bad.BADConstants;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * A repetitive channel operator, which uses a Java timer to run a given query periodically
- */
-public class RepetitiveChannelOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorDescriptor.class.getName());
-
-    /** The unique identifier of the job. **/
-    protected final EntityId entityId;
-
-    protected final JobSpecification jobSpec;
-
-    private final String duration;
-
-    private String strIP;
-    private int port;
-
-    public RepetitiveChannelOperatorDescriptor(JobSpecification spec, String dataverseName, String channelName,
-            String duration, JobSpecification channeljobSpec, String strIP, int port) {
-        super(spec, 0, 0);
-        this.entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
-        this.jobSpec = channeljobSpec;
-        this.duration = duration;
-        this.strIP = strIP;
-        this.port = port;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        ActiveRuntimeId runtimeId = new ActiveRuntimeId(entityId,
-                RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition);
-        return new RepetitiveChannelOperatorNodePushable(ctx, runtimeId, jobSpec, duration, strIP, port);
-    }
-
-    public String getDuration() {
-        return duration;
-    }
-
-    public EntityId getEntityId() {
-        return entityId;
-    }
-
-    public JobSpecification getJobSpec() {
-        return jobSpec;
-    }
-
-}