You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/07/22 13:34:09 UTC

[7/7] asterixdb git commit: Refactor General Active Classes

Refactor General Active Classes

This change is the first of a series of changes aiming to refactor
feed related classes into generic active and feed specific.
In this change, we introduce asterix-active which will be one level
below asterix-external-data. The ultimate goal is to have a very
flexible framewrok for long running activities within asterix.

Change-Id: I0a8f33ee5b45c5e090b08c24a102e369aae43c04
Reviewed-on: https://asterix-gerrit.ics.uci.edu/977
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/973a0d34
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/973a0d34
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/973a0d34

Branch: refs/heads/master
Commit: 973a0d34e36de2fb950f4979843b3bef9f1f992d
Parents: 3044e2e
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Fri Jul 22 14:38:56 2016 +0300
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Fri Jul 22 06:32:59 2016 -0700

----------------------------------------------------------------------
 asterixdb/asterix-active/pom.xml                |  45 ++
 .../org/apache/asterix/active/ActiveEvent.java  |  68 ++
 .../org/apache/asterix/active/ActiveJob.java    |  92 +++
 .../active/ActiveJobNotificationHandler.java    | 194 ++++++
 .../apache/asterix/active/ActiveManager.java    |  70 ++
 .../asterix/active/ActivePartitionMessage.java  |  55 ++
 .../apache/asterix/active/ActiveRuntime.java    |  39 ++
 .../apache/asterix/active/ActiveRuntimeId.java  |  77 +++
 .../asterix/active/ActiveRuntimeManager.java    |  84 +++
 .../asterix/active/ActiveRuntimeRegistry.java   |  79 +++
 .../org/apache/asterix/active/Activity.java     |  80 +++
 .../apache/asterix/active/ActivityState.java    |  28 +
 .../asterix/active/ConcurrentFramePool.java     | 292 ++++++++
 .../org/apache/asterix/active/EntityId.java     |  76 +++
 .../org/apache/asterix/active/FrameAction.java  |  70 ++
 .../active/IActiveEntityEventsListener.java     |  34 +
 .../apache/asterix/active/IActiveMessage.java   |  44 ++
 .../apache/asterix/active/IActiveRuntime.java   |  28 +
 .../asterix/active/IActiveRuntimeRegistry.java  |  49 ++
 .../optimizer/rules/UnnestToDataScanRule.java   |   2 +-
 .../translator/AbstractLangTranslator.java      |  18 +-
 .../asterix/translator/CompiledStatements.java  |  66 +-
 .../LangExpressionToPlanTranslator.java         | 403 +++++------
 .../apache/asterix/api/common/APIFramework.java |  20 +-
 .../api/common/AsterixAppRuntimeContext.java    |  37 +-
 .../asterix/api/http/servlet/AQLAPIServlet.java |  15 +-
 .../asterix/api/http/servlet/DDLAPIServlet.java |  17 +-
 .../asterix/api/http/servlet/FeedServlet.java   |   6 +-
 .../api/http/servlet/QueryAPIServlet.java       |  10 +-
 .../api/http/servlet/RESTAPIServlet.java        |   7 +-
 .../api/http/servlet/UpdateAPIServlet.java      |  10 +-
 .../app/external/ActiveLifecycleListener.java   |  82 +++
 .../external/FeedJobNotificationHandler.java    | 679 -------------------
 .../apache/asterix/app/external/FeedJoint.java  |  27 +-
 .../app/external/FeedLifecycleListener.java     | 228 -------
 .../asterix/app/external/FeedOperations.java    |  42 +-
 .../app/external/FeedWorkCollection.java        |   5 +-
 .../asterix/aql/translator/QueryTranslator.java | 656 +++++++++---------
 .../bootstrap/CCApplicationEntryPoint.java      |   5 +-
 .../asterix/messaging/CCMessageBroker.java      |  13 +-
 .../asterix/test/sqlpp/ParserTestExecutor.java  |   7 +-
 .../messaging/api/IApplicationMessage.java      |   2 +-
 asterixdb/asterix-external-data/pom.xml         |  25 +-
 .../external/feed/api/FeedOperationCounter.java |  12 +-
 .../apache/asterix/external/feed/api/IFeed.java |   4 +-
 .../feed/api/IFeedConnectionManager.java        |  75 --
 .../asterix/external/feed/api/IFeedJoint.java   |   9 +-
 .../feed/api/IFeedLifecycleListener.java        |  49 --
 .../asterix/external/feed/api/IFeedMessage.java |  41 --
 .../asterix/external/feed/api/IFeedRuntime.java |  47 --
 .../external/feed/api/ISubscribableRuntime.java |   3 +-
 .../dataflow/DistributeFeedFrameWriter.java     |  12 +-
 .../feed/dataflow/FeedRuntimeInputHandler.java  |   9 +-
 .../external/feed/dataflow/FrameAction.java     |  70 --
 .../feed/management/ConcurrentFramePool.java    | 293 --------
 .../feed/management/FeedCollectInfo.java        |  52 --
 .../feed/management/FeedConnectionId.java       |  13 +-
 .../feed/management/FeedConnectionManager.java  | 107 ---
 .../feed/management/FeedConnectionRequest.java  |  14 +-
 .../feed/management/FeedEventsListener.java     | 644 ++++++++++++++++++
 .../external/feed/management/FeedId.java        |  66 --
 .../external/feed/management/FeedJointKey.java  |  14 +-
 .../external/feed/management/FeedManager.java   |  97 ---
 .../feed/management/FeedRuntimeManager.java     |  83 ---
 .../external/feed/message/EndFeedMessage.java   |  22 +-
 .../external/feed/message/FeedMessage.java      |   4 +-
 .../feed/message/FeedPartitionStartMessage.java |  48 --
 .../feed/runtime/AdapterRuntimeManager.java     |   8 +-
 .../feed/runtime/CollectionRuntime.java         |  17 +-
 .../external/feed/runtime/FeedRuntime.java      |  41 --
 .../external/feed/runtime/FeedRuntimeId.java    |  88 ---
 .../external/feed/runtime/IngestionRuntime.java |   5 +-
 .../feed/runtime/SubscribableRuntime.java       |  16 +-
 .../external/feed/watch/FeedConnectJobInfo.java |  11 +-
 .../external/feed/watch/FeedIntakeInfo.java     |  17 +-
 .../external/feed/watch/FeedJobInfo.java        |  86 ---
 .../FeedCollectOperatorDescriptor.java          |  26 +-
 .../FeedCollectOperatorNodePushable.java        |  31 +-
 .../operators/FeedIntakeOperatorDescriptor.java |  13 +-
 .../FeedIntakeOperatorNodePushable.java         |  31 +-
 .../FeedMessageOperatorDescriptor.java          |  13 +-
 .../FeedMessageOperatorNodePushable.java        |  58 +-
 .../operators/FeedMetaComputeNodePushable.java  |  27 +-
 .../operators/FeedMetaOperatorDescriptor.java   |   2 +-
 .../operators/FeedMetaStoreNodePushable.java    |  30 +-
 .../apache/asterix/external/util/FeedUtils.java |  23 +
 .../feed/test/ConcurrentFramePoolUnitTest.java  |   4 +-
 .../external/feed/test/InputHandlerTest.java    |  13 +-
 .../aql/statement/SubscribeFeedStatement.java   |  25 +-
 .../asterix/lang/aql/util/RangeMapBuilder.java  |   6 +-
 .../asterix/lang/common/base/Statement.java     |  85 +--
 .../lang/common/statement/CompactStatement.java |   6 +-
 .../common/statement/ConnectFeedStatement.java  |   6 +-
 .../statement/CreateDataverseStatement.java     |  11 +-
 .../statement/CreateFeedPolicyStatement.java    |   6 +-
 .../common/statement/CreateFeedStatement.java   |   5 +-
 .../statement/CreateFunctionStatement.java      |   6 +-
 .../common/statement/CreateIndexStatement.java  |   6 +-
 .../statement/CreatePrimaryFeedStatement.java   |   6 +-
 .../statement/CreateSecondaryFeedStatement.java |   6 +-
 .../lang/common/statement/DatasetDecl.java      |   6 +-
 .../lang/common/statement/DataverseDecl.java    |   6 +-
 .../statement/DataverseDropStatement.java       |   6 +-
 .../lang/common/statement/DeleteStatement.java  |   6 +-
 .../statement/DisconnectFeedStatement.java      |   6 +-
 .../lang/common/statement/DropStatement.java    |   6 +-
 .../common/statement/FeedDropStatement.java     |   6 +-
 .../statement/FeedPolicyDropStatement.java      |   6 +-
 .../lang/common/statement/FunctionDecl.java     |   6 +-
 .../common/statement/FunctionDropStatement.java |   6 +-
 .../lang/common/statement/IndexDecl.java        |   6 +-
 .../common/statement/IndexDropStatement.java    |   6 +-
 .../lang/common/statement/InsertStatement.java  |   6 +-
 .../lang/common/statement/LoadStatement.java    |   6 +-
 .../statement/NodeGroupDropStatement.java       |   6 +-
 .../lang/common/statement/NodegroupDecl.java    |   6 +-
 .../asterix/lang/common/statement/Query.java    |   6 +-
 .../RefreshExternalDatasetStatement.java        |   6 +-
 .../lang/common/statement/RunStatement.java     |   6 +-
 .../lang/common/statement/SetStatement.java     |   6 +-
 .../asterix/lang/common/statement/TypeDecl.java |   6 +-
 .../common/statement/TypeDropStatement.java     |   6 +-
 .../lang/common/statement/UpdateStatement.java  |   6 +-
 .../lang/common/statement/UpsertStatement.java  |   7 +-
 .../lang/common/statement/WriteStatement.java   |   6 +-
 .../metadata/declared/FeedDataSource.java       |  13 +-
 .../apache/asterix/metadata/entities/Feed.java  |  11 +-
 .../metadata/feeds/FeedMetadataUtil.java        |  48 +-
 .../tools/translator/ADGenDmlTranslator.java    |  12 +-
 asterixdb/pom.xml                               |   1 +
 .../ScalarFunctionCallExpression.java           |   2 +-
 131 files changed, 3411 insertions(+), 3235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
new file mode 100644
index 0000000..140d67a
--- /dev/null
+++ b/asterixdb/asterix-active/pom.xml
@@ -0,0 +1,45 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.asterix</groupId>
+    <artifactId>apache-asterixdb</artifactId>
+    <version>0.8.9-SNAPSHOT</version>
+  </parent>
+  <artifactId>asterix-active</artifactId>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-om</artifactId>
+      <version>0.8.9-SNAPSHOT</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-test-support</artifactId>
+      <version>0.2.18-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-runtime</artifactId>
+      <version>0.8.9-SNAPSHOT</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>algebricks-compiler</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-hdfs-core</artifactId>
+      <version>${hyracks.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-common</artifactId>
+      <version>0.8.9-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
new file mode 100644
index 0000000..e5ccd05
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.job.JobId;
+
+public class ActiveEvent {
+
+    private final JobId jobId;
+    private final EntityId entityId;
+    private final Serializable payload;
+    private final EventKind eventKind;
+
+    public enum EventKind {
+        JOB_START,
+        JOB_FINISH,
+        PARTITION_EVENT
+    }
+
+    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind) {
+        this(jobId, eventKind, null, null);
+    }
+
+    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId feedId) {
+        this(jobId, eventKind, feedId, null);
+    }
+
+    public ActiveEvent(JobId jobId, ActiveEvent.EventKind eventKind, EntityId feedId, Serializable payload) {
+        this.jobId = jobId;
+        this.eventKind = eventKind;
+        this.entityId = feedId;
+        this.payload = payload;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public EntityId getFeedId() {
+        return entityId;
+    }
+
+    public Serializable getPayload() {
+        return payload;
+    }
+
+    public EventKind getEventKind() {
+        return eventKind;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
new file mode 100644
index 0000000..1e3eca1
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJob.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class ActiveJob implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(ActiveJob.class.getName());
+    protected final EntityId entityId;
+    protected JobId jobId;
+    protected final Serializable jobObject;
+    protected ActivityState state;
+    protected JobSpecification spec;
+
+    public ActiveJob(EntityId entityId, JobId jobId, ActivityState state, Serializable jobInfo, JobSpecification spec) {
+        this.entityId = entityId;
+        this.state = state;
+        this.jobId = jobId;
+        this.jobObject = jobInfo;
+        this.spec = spec;
+    }
+
+    public ActiveJob(EntityId entityId, ActivityState state, Serializable jobInfo, JobSpecification spec) {
+        this.entityId = entityId;
+        this.state = state;
+        this.jobObject = jobInfo;
+        this.spec = spec;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public void setJobId(JobId jobId) {
+        this.jobId = jobId;
+    }
+
+    public ActivityState getState() {
+        return state;
+    }
+
+    public void setState(ActivityState state) {
+        this.state = state;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(this + " is in " + state + " state.");
+        }
+    }
+
+    public Object getJobObject() {
+        return jobObject;
+    }
+
+    public JobSpecification getSpec() {
+        return spec;
+    }
+
+    public void setSpec(JobSpecification spec) {
+        this.spec = spec;
+    }
+
+    @Override
+    public String toString() {
+        return jobId + " [" + jobObject + "]";
+    }
+
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
new file mode 100644
index 0000000..9c69aca
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveEvent.EventKind;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class ActiveJobNotificationHandler implements Runnable {
+    public static final ActiveJobNotificationHandler INSTANCE = new ActiveJobNotificationHandler();
+    public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
+    private static final Logger LOGGER = Logger.getLogger(ActiveJobNotificationHandler.class.getName());
+    private static final boolean DEBUG = false;
+    private final LinkedBlockingQueue<ActiveEvent> eventInbox;
+    private final Map<EntityId, IActiveEntityEventsListener> entityEventListener;
+    private final Map<JobId, ActiveJob> jobId2ActiveJobInfos;
+
+    private ActiveJobNotificationHandler() {
+        this.eventInbox = new LinkedBlockingQueue<>();
+        this.jobId2ActiveJobInfos = new HashMap<>();
+        this.entityEventListener = new HashMap<>();
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName(ActiveJobNotificationHandler.class.getSimpleName());
+        LOGGER.log(Level.INFO, "Started " + ActiveJobNotificationHandler.class.getSimpleName());
+        while (!Thread.interrupted()) {
+            try {
+                ActiveEvent event = getEventInbox().take();
+                ActiveJob jobInfo = jobId2ActiveJobInfos.get(event.getJobId());
+                EntityId entityId = jobInfo.getEntityId();
+                IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+                if (DEBUG) {
+                    LOGGER.log(Level.INFO, "Next event is of type " + event.getEventKind());
+                    LOGGER.log(Level.INFO, "Notifying the listener");
+                }
+                listener.notify(event);
+                if (event.getEventKind() == EventKind.JOB_FINISH) {
+                    removeFinishedJob(event.getJobId());
+                    removeInactiveListener(listener);
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Error handling an active job event", e);
+            }
+        }
+        LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
+    }
+
+    private void removeFinishedJob(JobId jobId) {
+        if (DEBUG) {
+            LOGGER.log(Level.INFO, "Removing the job");
+        }
+        jobId2ActiveJobInfos.remove(jobId);
+    }
+
+    private void removeInactiveListener(IActiveEntityEventsListener listener) {
+        if (!listener.isEntityActive()) {
+            if (DEBUG) {
+                LOGGER.log(Level.INFO, "Removing the listener since it is not active anymore");
+            }
+            entityEventListener.remove(listener.getEntityId());
+        }
+    }
+
+    public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
+        if (DEBUG) {
+            LOGGER.log(Level.INFO, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
+            IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+            LOGGER.log(Level.INFO, "Listener found: " + listener);
+        }
+        return entityEventListener.get(entityId);
+    }
+
+    public synchronized ActiveJob[] getActiveJobs() {
+        if (DEBUG) {
+            LOGGER.log(Level.INFO, "getActiveJobs()  was called");
+            LOGGER.log(Level.INFO, "Number of jobs found: " + jobId2ActiveJobInfos.size());
+        }
+        return jobId2ActiveJobInfos.values().toArray(new ActiveJob[jobId2ActiveJobInfos.size()]);
+    }
+
+    public boolean isActiveJob(JobId jobId) {
+        if (DEBUG) {
+            LOGGER.log(Level.INFO, "isActiveJob(JobId jobId) called with jobId: " + jobId);
+            boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+            LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
+        }
+        return jobId2ActiveJobInfos.get(jobId) != null;
+    }
+
+    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) {
+        if (DEBUG) {
+            LOGGER.log(Level.INFO,
+                    "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = "
+                            + jobId);
+        }
+        Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
+        if (property == null || !(property instanceof ActiveJob)) {
+            if (DEBUG) {
+                LOGGER.log(Level.INFO, "Job was is not active. property found to be: " + property);
+            }
+            return;
+        } else {
+            monitorJob(jobId, (ActiveJob) property);
+        }
+        if (DEBUG) {
+            boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+            LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
+        }
+        ActiveJob jobInfo = jobId2ActiveJobInfos.get(jobId);
+        if (jobInfo != null) {
+            EntityId entityId = jobInfo.getEntityId();
+            IActiveEntityEventsListener listener = entityEventListener.get(entityId);
+            listener.notifyJobCreation(jobId, jobSpecification);
+            if (DEBUG) {
+                LOGGER.log(Level.INFO, "Listener was notified" + jobId);
+            }
+        } else {
+            if (DEBUG) {
+                LOGGER.log(Level.INFO, "Listener was not notified since it was not registered for the job " + jobId);
+            }
+        }
+    }
+
+    public LinkedBlockingQueue<ActiveEvent> getEventInbox() {
+        return eventInbox;
+    }
+
+    public synchronized IActiveEntityEventsListener[] getEventListeners() {
+        if (DEBUG) {
+            LOGGER.log(Level.INFO, "getEventListeners() was called");
+            LOGGER.log(Level.INFO, "returning " + entityEventListener.size() + " Listeners");
+        }
+        return entityEventListener.values().toArray(new IActiveEntityEventsListener[entityEventListener.size()]);
+    }
+
+    public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException {
+        if (DEBUG) {
+            LOGGER.log(Level.INFO, "registerListener(IActiveEntityEventsListener listener) was called for the entity "
+                    + listener.getEntityId());
+        }
+        if (entityEventListener.containsKey(listener.getEntityId())) {
+            throw new HyracksDataException(
+                    "Active Entity Listener " + listener.getEntityId() + " is already registered");
+        }
+        entityEventListener.put(listener.getEntityId(), listener);
+    }
+
+    public synchronized void monitorJob(JobId jobId, ActiveJob activeJob) {
+        if (DEBUG) {
+            LOGGER.log(Level.INFO, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
+            boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+            LOGGER.log(Level.INFO, "Job was found to be: " + (found ? "Active" : "Inactive"));
+        }
+        if (entityEventListener.containsKey(activeJob.getEntityId())) {
+            if (jobId2ActiveJobInfos.containsKey(jobId)) {
+                LOGGER.severe("Job is already being monitored for job: " + jobId);
+                return;
+            }
+            if (DEBUG) {
+                LOGGER.log(Level.INFO, "monitoring started for job id: " + jobId);
+            }
+            jobId2ActiveJobInfos.put(jobId, activeJob);
+        } else {
+            LOGGER.severe("No listener was found for the entity: " + activeJob.getEntityId());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
new file mode 100644
index 0000000..5992294
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ActiveManager {
+
+    private final Map<ActiveRuntimeId, IActiveRuntime> runtimes;
+
+    private final IActiveRuntimeRegistry activeRuntimeRegistry;
+
+    private final ConcurrentFramePool activeFramePool;
+
+    private final String nodeId;
+
+    public ActiveManager(String nodeId, long activeMemoryBudget, int frameSize) throws HyracksDataException {
+        this.nodeId = nodeId;
+        this.activeRuntimeRegistry = new ActiveRuntimeRegistry(nodeId);
+        this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize);
+        this.runtimes = new ConcurrentHashMap<>();
+    }
+
+    public IActiveRuntimeRegistry getActiveRuntimeRegistry() {
+        return activeRuntimeRegistry;
+    }
+
+    public ConcurrentFramePool getFramePool() {
+        return activeFramePool;
+    }
+
+    public void registerRuntime(IActiveRuntime runtime) {
+        ActiveRuntimeId id = runtime.getRuntimeId();
+        if (!runtimes.containsKey(id)) {
+            runtimes.put(id, runtime);
+        }
+    }
+
+    public void deregisterRuntime(ActiveRuntimeId id) {
+        runtimes.remove(id);
+    }
+
+    public IActiveRuntime getSubscribableRuntime(ActiveRuntimeId subscribableRuntimeId) {
+        return runtimes.get(subscribableRuntimeId);
+    }
+
+    @Override
+    public String toString() {
+        return ActiveManager.class.getSimpleName() + "[" + nodeId + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java
new file mode 100644
index 0000000..8875647
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivePartitionMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.job.JobId;
+
+public class ActivePartitionMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final EntityId feedId;
+    private final JobId jobId;
+    private final Serializable payload;
+
+    public ActivePartitionMessage(EntityId feedId, JobId jobId, Serializable payload) {
+        this.feedId = feedId;
+        this.jobId = jobId;
+        this.payload = payload;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.ACTIVE_ENTITY_MESSAGE;
+    }
+
+    public EntityId getFeedId() {
+        return feedId;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public Serializable getPayload() {
+        return payload;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java
new file mode 100644
index 0000000..8b0914a
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntime.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+public class ActiveRuntime implements IActiveRuntime {
+
+    /** A unique identifier for the runtime **/
+    protected final ActiveRuntimeId runtimeId;
+
+    public ActiveRuntime(ActiveRuntimeId runtimeId) {
+        this.runtimeId = runtimeId;;
+    }
+
+    @Override
+    public ActiveRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
+    @Override
+    public String toString() {
+        return runtimeId.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
new file mode 100644
index 0000000..64926fd
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeId.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+
+public class ActiveRuntimeId implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final EntityId entityId;
+    private final String runtimeId;
+    private final int partition;
+    private final int hashCode;
+
+    public ActiveRuntimeId(EntityId entityId, String runtimeId, int partition) {
+        this.entityId = entityId;
+        this.runtimeId = runtimeId;
+        this.partition = partition;
+        this.hashCode = toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "(" + entityId + ")" + "[" + partition + "]:" + runtimeId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ActiveRuntimeId)) {
+            return false;
+        }
+        ActiveRuntimeId other = (ActiveRuntimeId) o;
+        return other.entityId.equals(entityId) && other.getFeedRuntimeType().equals(runtimeId)
+                && other.getPartition() == partition;
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    public String getFeedRuntimeType() {
+        return runtimeId;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public String getRuntimeType() {
+        return runtimeId;
+    }
+
+    public EntityId getFeedId() {
+        return entityId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
new file mode 100644
index 0000000..9743856
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ActiveRuntimeManager {
+
+    private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeManager.class.getName());
+    private final Map<ActiveRuntimeId, ActiveRuntime> activeRuntimes;
+
+    private final ExecutorService executorService;
+
+    public ActiveRuntimeManager() {
+        this.activeRuntimes = new ConcurrentHashMap<>();
+        this.executorService = Executors.newCachedThreadPool();
+    }
+
+    public void close() throws IOException {
+        if (executorService != null) {
+            executorService.shutdown();
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Shut down executor service for :" + ActiveRuntimeManager.class.getSimpleName());
+            }
+            try {
+                executorService.awaitTermination(10L, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                LOGGER.log(Level.SEVERE, ActiveRuntimeManager.class.getSimpleName()
+                        + " was interrupted while waiting for runtime managers to shutdown", e);
+            }
+            if (!executorService.isTerminated()) {
+                LOGGER.severe(ActiveRuntimeManager.class.getSimpleName()
+                        + " failed to shutdown successfully. Will be forced to shutdown");
+                executorService.shutdownNow();
+            }
+        }
+    }
+
+    public ActiveRuntime getFeedRuntime(ActiveRuntimeId runtimeId) {
+        return activeRuntimes.get(runtimeId);
+    }
+
+    public void registerRuntime(ActiveRuntimeId runtimeId, ActiveRuntime feedRuntime) {
+        activeRuntimes.put(runtimeId, feedRuntime);
+    }
+
+    public synchronized void deregisterRuntime(ActiveRuntimeId runtimeId) {
+        activeRuntimes.remove(runtimeId);
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public Set<ActiveRuntimeId> getFeedRuntimes() {
+        return activeRuntimes.keySet();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java
new file mode 100644
index 0000000..050426c
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeRegistry.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An implementation of the {@code IActiveRuntimeRegistry} interface.
+ * Provider necessary central repository for registering/retrieving
+ * artifacts/services associated with an active entity.
+ */
+public class ActiveRuntimeRegistry implements IActiveRuntimeRegistry {
+
+    private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeRegistry.class.getName());
+
+    private Map<ActiveRuntimeId, ActiveRuntimeManager> activeRuntimeManagers = new HashMap<>();
+    private final String nodeId;
+
+    public ActiveRuntimeRegistry(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void deregisterRuntime(ActiveRuntimeId runtimeId) {
+        try {
+            ActiveRuntimeManager mgr = activeRuntimeManagers.get(runtimeId);
+            if (mgr != null) {
+                mgr.deregisterRuntime(runtimeId);
+                mgr.close();
+                activeRuntimeManagers.remove(runtimeId);
+            }
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.log(Level.WARNING, "Exception in closing feed runtime" + e.getMessage(), e);
+            }
+        }
+
+    }
+
+    @Override
+    public synchronized void registerRuntime(ActiveRuntime runtime) {
+        ActiveRuntimeManager runtimeMgr = activeRuntimeManagers.get(runtime.getRuntimeId());
+        if (runtimeMgr == null) {
+            runtimeMgr = new ActiveRuntimeManager();
+            activeRuntimeManagers.put(runtime.getRuntimeId(), runtimeMgr);
+        }
+        runtimeMgr.registerRuntime(runtime.getRuntimeId(), runtime);
+    }
+
+    @Override
+    public ActiveRuntime getRuntime(ActiveRuntimeId runtimeId) {
+        ActiveRuntimeManager runtimeMgr = activeRuntimeManagers.get(runtimeId);
+        return runtimeMgr != null ? runtimeMgr.getFeedRuntime(runtimeId) : null;
+    }
+
+    @Override
+    public String toString() {
+        return ActiveRuntimeRegistry.class.getSimpleName() + "[" + nodeId + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/Activity.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/Activity.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/Activity.java
new file mode 100644
index 0000000..9538118
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/Activity.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.active;
+
+import java.util.Map;
+
+public class Activity implements Comparable<Activity> {
+
+    protected int activityId;
+    protected final EntityId activeEntityId;
+    protected final Map<String, String> activityDetails;
+
+    public Activity(EntityId activeEntityId, Map<String, String> activityDetails) {
+        this.activeEntityId = activeEntityId;
+        this.activityDetails = activityDetails;
+    }
+
+    public String getDataverseName() {
+        return activeEntityId.getDataverse();
+    }
+
+    public String getActiveEntityName() {
+        return activeEntityId.getEntityName();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof Activity)) {
+            return false;
+        }
+        return ((Activity) other).activeEntityId.equals(activeEntityId)
+                && ((Activity) other).getActivityId() != (activityId);
+    }
+
+    @Override
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return activeEntityId + "." + activityId;
+    }
+
+    public int getActivityId() {
+        return activityId;
+    }
+
+    public void setActivityId(int activityId) {
+        this.activityId = activityId;
+    }
+
+    public Map<String, String> getActivityDetails() {
+        return activityDetails;
+    }
+
+    @Override
+    public int compareTo(Activity o) {
+        return o.getActivityId() - this.activityId;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
new file mode 100644
index 0000000..1301535
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+// TODO: Document the state machine and its transition.
+public enum ActivityState {
+    CREATED,
+    INACTIVE,
+    ACTIVE,
+    UNDER_RECOVERY,
+    ENDED
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ConcurrentFramePool.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ConcurrentFramePool.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ConcurrentFramePool.java
new file mode 100644
index 0000000..afe3b06
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ConcurrentFramePool.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
+
+public class ConcurrentFramePool {
+    private static final boolean DEBUG = false;
+    private static final String ERROR_INVALID_FRAME_SIZE =
+            "The size should be an integral multiple of the default frame size";
+    private static final String ERROR_LARGER_THAN_BUDGET_REQUEST =
+            "The requested frame size must not be greater than the allocated budget";
+    private static final Logger LOGGER = Logger.getLogger(ConcurrentFramePool.class.getName());
+    private final String nodeId;
+    private final int budget;
+    private final int defaultFrameSize;
+    private final ArrayDeque<ByteBuffer> pool;
+    private final ArrayDeque<FrameAction> subscribers = new ArrayDeque<>();
+    private final Map<Integer, ArrayDeque<ByteBuffer>> largeFramesPools;
+    private int handedOut;
+    private int created;
+
+    public ConcurrentFramePool(String nodeId, long budgetInBytes, int frameSize) {
+        this.nodeId = nodeId;
+        this.defaultFrameSize = frameSize;
+        this.budget = (int) (budgetInBytes / frameSize);
+        this.pool = new ArrayDeque<>(budget);
+        this.largeFramesPools = new HashMap<>();
+    }
+
+    public int getMaxFrameSize() {
+        return budget * defaultFrameSize;
+    }
+
+    public synchronized ByteBuffer get() {
+        // Subscribers have higher priority
+        if (subscribers.isEmpty()) {
+            return doGet();
+        }
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+                    + subscribers.size());
+        }
+        return null;
+    }
+
+    private ByteBuffer doGet() {
+        if (handedOut < budget) {
+            handedOut++;
+            return allocate();
+        }
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+                    + ", Requested = 1");
+        }
+        return null;
+    }
+
+    public int remaining() {
+        return budget - handedOut;
+    }
+
+    private ByteBuffer doGet(int bufferSize) throws HyracksDataException {
+        // Subscribers have higher priority
+        if (bufferSize % defaultFrameSize != 0) {
+            throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE);
+        }
+        int multiplier = bufferSize / defaultFrameSize;
+        if (multiplier > budget) {
+            throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+        }
+        if (handedOut + multiplier <= budget) {
+            handedOut += multiplier;
+            ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiplier);
+            if (largeFramesPool == null || largeFramesPool.isEmpty()) {
+                if (created + multiplier > budget) {
+                    freeup(multiplier);
+                }
+                created += multiplier;
+                return ByteBuffer.allocate(bufferSize);
+            }
+            ByteBuffer buffer = largeFramesPool.poll();
+            buffer.clear();
+            return buffer;
+        }
+        // Not enough budget
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+                    + ", Requested = " + multiplier);
+        }
+        return null;
+    }
+
+    public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+        if (subscribers.isEmpty()) {
+            return doGet(bufferSize);
+        }
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+                    + subscribers.size());
+        }
+        return null;
+    }
+
+    private int freeup(int desiredNumberOfFreePages) {
+        int needToFree = desiredNumberOfFreePages - (budget - created);
+        int freed = 0;
+        // start by large frames
+        for (Iterator<Entry<Integer, ArrayDeque<ByteBuffer>>> it = largeFramesPools.entrySet().iterator(); it
+                .hasNext();) {
+            Entry<Integer, ArrayDeque<ByteBuffer>> entry = it.next();
+            if (entry.getKey() != desiredNumberOfFreePages) {
+                while (!entry.getValue().isEmpty()) {
+                    entry.getValue().pop();
+                    freed += entry.getKey();
+                    if (freed >= needToFree) {
+                        // created is handled here
+                        created -= freed;
+                        return freed;
+                    }
+                }
+                it.remove();
+            }
+        }
+        // freed all large pages. need to free small pages as well
+        needToFree -= freed;
+        while (needToFree > 0) {
+            pool.pop();
+            needToFree--;
+            freed++;
+        }
+        created -= freed;
+        return freed;
+    }
+
+    private ByteBuffer allocate() {
+        if (pool.isEmpty()) {
+            if (created == budget) {
+                freeup(1);
+            }
+            created++;
+            return ByteBuffer.allocate(defaultFrameSize);
+        } else {
+            ByteBuffer buffer = pool.pop();
+            buffer.clear();
+            return buffer;
+        }
+    }
+
+    public synchronized boolean get(Collection<ByteBuffer> buffers, int count) {
+        if (handedOut + count <= budget) {
+            handedOut += count;
+            for (int i = 0; i < count; i++) {
+                buffers.add(allocate());
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "ConcurrentFramePool  [" + nodeId + "]" + "(consumed:" + handedOut + "/" + budget + ")";
+    }
+
+    public synchronized void release(Collection<ByteBuffer> buffers) throws HyracksDataException {
+        for (ByteBuffer buffer : buffers) {
+            release(buffer);
+        }
+    }
+
+    public synchronized void release(ByteBuffer buffer) throws HyracksDataException {
+        int multiples = buffer.capacity() / defaultFrameSize;
+        handedOut -= multiples;
+        if (DEBUG) {
+            LOGGER.info("Releasing " + multiples + " frames. Remaining frames = " + remaining());
+        }
+        if (multiples == 1) {
+            pool.add(buffer);
+        } else {
+            ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiples);
+            if (largeFramesPool == null) {
+                largeFramesPool = new ArrayDeque<>();
+                largeFramesPools.put(multiples, largeFramesPool);
+            }
+            largeFramesPool.push(buffer);
+        }
+        // check subscribers
+        while (!subscribers.isEmpty()) {
+            FrameAction frameAction = subscribers.peek();
+            ByteBuffer freeBuffer;
+            // check if we have enough and answer immediately.
+            if (frameAction.getSize() == defaultFrameSize) {
+                if (DEBUG) {
+                    LOGGER.info("Attempting to callback a subscriber that requested 1 frame");
+                }
+                freeBuffer = doGet();
+            } else {
+                if (DEBUG) {
+                    LOGGER.info("Attempting to callback a subscriber that requested "
+                            + frameAction.getSize() / defaultFrameSize + " frames");
+                }
+                freeBuffer = doGet(frameAction.getSize());
+            }
+            if (freeBuffer != null) {
+                int handedOutBeforeCall = handedOut;
+                try {
+                    frameAction.call(freeBuffer);
+                } catch (Exception e) {
+                    LOGGER.error("Error while attempting to answer a subscription. Buffer will be reclaimed", e);
+                    // TODO(amoudi): Add test cases and get rid of recursion
+                    if (handedOut == handedOutBeforeCall) {
+                        release(freeBuffer);
+                    }
+                    throw e;
+                } finally {
+                    subscribers.remove();
+                    if (DEBUG) {
+                        LOGGER.info(
+                                "A subscription has been satisfied. " + subscribers.size() + " remaining subscribers");
+                    }
+                }
+            } else {
+                if (DEBUG) {
+                    LOGGER.info("Failed to allocate requested frames");
+                }
+                break;
+            }
+        }
+        if (DEBUG) {
+            LOGGER.info(subscribers.size() + " remaining subscribers");
+        }
+    }
+
+    public synchronized boolean subscribe(FrameAction frameAction) throws HyracksDataException {
+        // check if subscribers are empty?
+        if (subscribers.isEmpty()) {
+            ByteBuffer buffer;
+            // check if we have enough and answer immediately.
+            if (frameAction.getSize() == defaultFrameSize) {
+                buffer = doGet();
+            } else {
+                buffer = doGet(frameAction.getSize());
+            }
+            if (buffer != null) {
+                frameAction.call(buffer);
+                // There is no need to subscribe. perform action and return false
+                return false;
+            }
+        } else {
+            int multiplier = frameAction.getSize() / defaultFrameSize;
+            if (multiplier > budget) {
+                throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+            }
+        }
+        // none of the above, add to subscribers and return true
+        subscribers.add(frameAction);
+        return true;
+    }
+
+    /*
+     * For unit testing purposes
+     */
+    public Collection<FrameAction> getSubscribers() {
+        return subscribers;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
new file mode 100644
index 0000000..cdf702d
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/EntityId.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+
+/**
+ * A unique identifier for a data feed.
+ */
+public class EntityId implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String extensionName;
+    private final String dataverse;
+    private final String entityName;
+    private final int hashCode;
+
+    public EntityId(String extentionName, String dataverse, String entityName) {
+        this.extensionName = extentionName;
+        this.dataverse = dataverse;
+        this.entityName = entityName;
+        this.hashCode = toString().hashCode();
+    }
+
+    public String getDataverse() {
+        return dataverse;
+    }
+
+    public String getEntityName() {
+        return entityName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !(o instanceof EntityId)) {
+            return false;
+        }
+        if (this == o || ((EntityId) o).getExtensionName().equals(extensionName)
+                && ((EntityId) o).getEntityName().equals(entityName)
+                && ((EntityId) o).getDataverse().equals(dataverse)) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public String toString() {
+        return dataverse + "." + entityName + "(" + extensionName + ")";
+    }
+
+    public String getExtensionName() {
+        return extensionName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/FrameAction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/FrameAction.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/FrameAction.java
new file mode 100644
index 0000000..849d360
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/FrameAction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.nio.ByteBuffer;
+
+import org.apache.log4j.Logger;
+
+public class FrameAction {
+    private static final boolean DEBUG = false;
+    private static final Logger LOGGER = Logger.getLogger(FrameAction.class.getName());
+    private ByteBuffer allocated;
+    private ByteBuffer frame;
+
+    public void call(ByteBuffer freeFrame) {
+        if (DEBUG) {
+            LOGGER.info("FrameAction: My subscription is being answered");
+        }
+        freeFrame.put(frame);
+        synchronized (this) {
+            allocated = freeFrame;
+            if (DEBUG) {
+                LOGGER.info("FrameAction: Waking up waiting threads");
+            }
+            notifyAll();
+        }
+    }
+
+    public synchronized ByteBuffer retrieve() throws InterruptedException {
+        if (DEBUG) {
+            LOGGER.info("FrameAction: Attempting to get allocated buffer");
+        }
+        while (allocated == null) {
+            if (DEBUG) {
+                LOGGER.info("FrameAction: Allocated buffer is not ready yet. I will wait for it");
+            }
+            wait();
+            if (DEBUG) {
+                LOGGER.info("FrameAction: Awoken Up");
+            }
+        }
+        ByteBuffer temp = allocated;
+        allocated = null;
+        return temp;
+    }
+
+    public void setFrame(ByteBuffer frame) {
+        this.frame = frame;
+    }
+
+    public int getSize() {
+        return frame.capacity();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
new file mode 100644
index 0000000..156576c
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public interface IActiveEntityEventsListener {
+
+    public void notify(ActiveEvent message);
+
+    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification);
+
+    public boolean isEntityActive();
+
+    public EntityId getEntityId();
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveMessage.java
new file mode 100644
index 0000000..e4c7171
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveMessage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.dataflow.value.JSONSerializable;
+
+/**
+ * @deprecated
+ *             This interface is expected to go away. instead, one should use the IMessageBroker interfaces to exchange
+ *             messages
+ */
+@Deprecated
+public interface IActiveMessage extends Serializable, JSONSerializable {
+
+    public enum MessageType {
+        END
+    }
+
+    /**
+     * Gets the type associated with this message
+     *
+     * @return MessageType type associated with this message
+     */
+    public MessageType getMessageType();
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
new file mode 100644
index 0000000..32c5c50
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+@FunctionalInterface
+public interface IActiveRuntime {
+
+    /**
+     * @return the unique runtime id associated with the feedRuntime
+     */
+    public ActiveRuntimeId getRuntimeId();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java
new file mode 100644
index 0000000..b2c6f8e
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntimeRegistry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+import java.io.IOException;
+
+public interface IActiveRuntimeRegistry {
+
+    /**
+     * Allows registration of a runtime.
+     *
+     * @param runtime
+     * @throws Exception
+     */
+    public void registerRuntime(ActiveRuntime runtime);
+
+    /**
+     * Obtain runtime corresponding to a feedRuntimeId
+     *
+     * @param runtimeId
+     * @return
+     */
+    public ActiveRuntime getRuntime(ActiveRuntimeId runtimeId);
+
+    /**
+     * De-register a feed
+     *
+     * @param feedConnection
+     * @throws IOException
+     */
+    void deregisterRuntime(ActiveRuntimeId runtimeId);
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 327154f..d623830 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -22,10 +22,10 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.declared.AqlSourceId;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index a161717..83e4375 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@ -47,7 +47,7 @@ import org.apache.hyracks.algebricks.common.utils.Pair;
  */
 public abstract class AbstractLangTranslator {
 
-    protected static final Logger LOGGER = Logger.getLogger(AbstractLangTranslator.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(AbstractLangTranslator.class.getName());
 
     protected static final Map<String, BuiltinType> builtinTypeMap = AsterixBuiltinTypeMap.getBuiltinTypes();
 
@@ -107,7 +107,7 @@ public abstract class AbstractLangTranslator {
         String message = null;
         String dataverse = defaultDataverse != null ? defaultDataverse.getDataverseName() : null;
         switch (stmt.getKind()) {
-            case INSERT:
+            case Statement.INSERT:
                 InsertStatement insertStmt = (InsertStatement) stmt;
                 if (insertStmt.getDataverseName() != null) {
                     dataverse = insertStmt.getDataverseName().getValue();
@@ -119,7 +119,7 @@ public abstract class AbstractLangTranslator {
                 }
                 break;
 
-            case DELETE:
+            case Statement.DELETE:
                 DeleteStatement deleteStmt = (DeleteStatement) stmt;
                 if (deleteStmt.getDataverseName() != null) {
                     dataverse = deleteStmt.getDataverseName().getValue();
@@ -131,7 +131,7 @@ public abstract class AbstractLangTranslator {
                 }
                 break;
 
-            case NODEGROUP_DROP:
+            case Statement.NODEGROUP_DROP:
                 String nodegroupName = ((NodeGroupDropStatement) stmt).getNodeGroupName().getValue();
                 invalidOperation = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME.equals(nodegroupName);
                 if (invalidOperation) {
@@ -139,16 +139,16 @@ public abstract class AbstractLangTranslator {
                 }
                 break;
 
-            case DATAVERSE_DROP:
+            case Statement.DATAVERSE_DROP:
                 DataverseDropStatement dvDropStmt = (DataverseDropStatement) stmt;
-                invalidOperation = MetadataConstants.METADATA_DATAVERSE_NAME
-                        .equals(dvDropStmt.getDataverseName().getValue());
+                invalidOperation =
+                        MetadataConstants.METADATA_DATAVERSE_NAME.equals(dvDropStmt.getDataverseName().getValue());
                 if (invalidOperation) {
                     message = "Cannot drop dataverse:" + dvDropStmt.getDataverseName().getValue();
                 }
                 break;
 
-            case DATASET_DROP:
+            case Statement.DATASET_DROP:
                 DropStatement dropStmt = (DropStatement) stmt;
                 if (dropStmt.getDataverseName() != null) {
                     dataverse = dropStmt.getDataverseName().getValue();
@@ -159,7 +159,7 @@ public abstract class AbstractLangTranslator {
                             + MetadataConstants.METADATA_DATAVERSE_NAME;
                 }
                 break;
-            case DATASET_DECL:
+            case Statement.DATASET_DECL:
                 DatasetDecl datasetStmt = (DatasetDecl) stmt;
                 Map<String, String> hints = datasetStmt.getHints();
                 if (hints != null && !hints.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index 9d092a7..7a3b2a4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.lang.common.base.Expression;
-import org.apache.asterix.lang.common.base.Statement.Kind;
+import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.om.types.IAType;
@@ -38,7 +38,7 @@ public class CompiledStatements {
 
     public static interface ICompiledStatement {
 
-        public Kind getKind();
+        public byte getKind();
     }
 
     public static class CompiledDatasetDropStatement implements ICompiledStatement {
@@ -59,8 +59,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.DATASET_DROP;
+        public byte getKind() {
+            return Statement.DATASET_DROP;
         }
     }
 
@@ -83,8 +83,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.CREATE_DATAVERSE;
+        public byte getKind() {
+            return Statement.CREATE_DATAVERSE;
         }
     }
 
@@ -100,8 +100,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.NODEGROUP_DROP;
+        public byte getKind() {
+            return Statement.NODEGROUP_DROP;
         }
     }
 
@@ -129,8 +129,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.INDEX_DROP;
+        public byte getKind() {
+            return Statement.INDEX_DROP;
         }
     }
 
@@ -152,8 +152,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.DATAVERSE_DROP;
+        public byte getKind() {
+            return Statement.DATAVERSE_DROP;
         }
     }
 
@@ -169,8 +169,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.TYPE_DROP;
+        public byte getKind() {
+            return Statement.TYPE_DROP;
         }
     }
 
@@ -241,8 +241,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.CREATE_INDEX;
+        public byte getKind() {
+            return Statement.CREATE_INDEX;
         }
     }
 
@@ -285,8 +285,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.LOAD;
+        public byte getKind() {
+            return Statement.LOAD;
         }
     }
 
@@ -322,8 +322,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.INSERT;
+        public byte getKind() {
+            return Statement.INSERT;
         }
     }
 
@@ -334,8 +334,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.UPSERT;
+        public byte getKind() {
+            return Statement.UPSERT;
         }
     }
 
@@ -380,8 +380,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.CONNECT_FEED;
+        public byte getKind() {
+            return Statement.CONNECT_FEED;
         }
 
         public String getPolicyName() {
@@ -405,7 +405,7 @@ public class CompiledStatements {
         }
 
         public String getFeedName() {
-            return request.getReceivingFeedId().getFeedName();
+            return request.getReceivingFeedId().getEntityName();
         }
 
         @Override
@@ -418,8 +418,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.SUBSCRIBE_FEED;
+        public byte getKind() {
+            return Statement.SUBSCRIBE_FEED;
         }
     }
 
@@ -449,8 +449,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.DISCONNECT_FEED;
+        public byte getKind() {
+            return Statement.DISCONNECT_FEED;
         }
 
     }
@@ -494,8 +494,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.DELETE;
+        public byte getKind() {
+            return Statement.DELETE;
         }
 
     }
@@ -518,8 +518,8 @@ public class CompiledStatements {
         }
 
         @Override
-        public Kind getKind() {
-            return Kind.COMPACT;
+        public byte getKind() {
+            return Statement.COMPACT;
         }
     }