You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/07/22 13:34:05 UTC

[3/7] asterixdb git commit: Refactor General Active Classes

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
deleted file mode 100644
index b8375e3..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-
-public interface IFeedRuntime {
-
-    public enum FeedRuntimeType {
-        INTAKE,
-        COLLECT,
-        COMPUTE_COLLECT,
-        COMPUTE,
-        STORE,
-        OTHER,
-        ETS,
-        JOIN
-    }
-
-    public enum Mode {
-        PROCESS,            // There is memory
-        SPILL,              // Memory budget has been consumed. Now we're writing to disk
-        DISCARD             // Memory budget has been consumed. Disk space budget has been consumed. Now we're
-                            // discarding
-    }
-
-    /**
-     * @return the unique runtime id associated with the feedRuntime
-     */
-    public FeedRuntimeId getRuntimeId();
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
index 1ca46ce..397f797 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
@@ -18,13 +18,14 @@
  */
 package org.apache.asterix.external.feed.api;
 
+import org.apache.asterix.active.IActiveRuntime;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Represent a feed runtime whose output can be routed along other parallel path(s).
  */
-public interface ISubscribableRuntime extends IFeedRuntime {
+public interface ISubscribableRuntime extends IActiveRuntime {
 
     /**
      * @param collectionRuntime

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
index f356899..ae2e0b9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
@@ -21,13 +21,11 @@ package org.apache.asterix.external.feed.dataflow;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 /**
  * Provides mechanism for distributing the frames, as received from an operator to a
@@ -38,7 +36,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 public class DistributeFeedFrameWriter implements IFrameWriter {
 
     /** A unique identifier for the feed to which the incoming tuples belong. **/
-    private final FeedId feedId;
+    private final EntityId feedId;
 
     /**
      * An instance of FrameDistributor that provides the mechanism for distributing a frame to multiple readers, each
@@ -55,8 +53,8 @@ public class DistributeFeedFrameWriter implements IFrameWriter {
     /** The value of the partition 'i' if this is the i'th instance of the associated operator **/
     private final int partition;
 
-    public DistributeFeedFrameWriter(IHyracksTaskContext ctx, FeedId feedId, IFrameWriter writer,
-            FeedRuntimeType feedRuntimeType, int partition, FrameTupleAccessor fta) throws IOException {
+    public DistributeFeedFrameWriter(EntityId feedId, IFrameWriter writer, FeedRuntimeType feedRuntimeType,
+            int partition) throws IOException {
         this.feedId = feedId;
         this.frameDistributor = new FrameDistributor();
         this.feedRuntimeType = feedRuntimeType;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 8ed2bf9..2e1c83f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -23,11 +23,12 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.management.ConcurrentFramePool;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ConcurrentFramePool;
+import org.apache.asterix.active.FrameAction;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.util.FeedUtils.Mode;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -67,7 +68,7 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
     private int numProcessedInMemory = 0;
     private int numStalled = 0;
 
-    public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
+    public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, ActiveRuntimeId runtimeId,
             IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
             throws HyracksDataException {
         this.writer = writer;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
deleted file mode 100644
index f02b4aa..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-
-import org.apache.log4j.Logger;
-
-public class FrameAction {
-    private static final boolean DEBUG = false;
-    private static final Logger LOGGER = Logger.getLogger(FrameAction.class.getName());
-    private ByteBuffer allocated;
-    private ByteBuffer frame;
-
-    public void call(ByteBuffer freeFrame) {
-        if (DEBUG) {
-            LOGGER.info("FrameAction: My subscription is being answered");
-        }
-        freeFrame.put(frame);
-        synchronized (this) {
-            allocated = freeFrame;
-            if (DEBUG) {
-                LOGGER.info("FrameAction: Waking up waiting threads");
-            }
-            notifyAll();
-        }
-    }
-
-    public synchronized ByteBuffer retrieve() throws InterruptedException {
-        if (DEBUG) {
-            LOGGER.info("FrameAction: Attempting to get allocated buffer");
-        }
-        while (allocated == null) {
-            if (DEBUG) {
-                LOGGER.info("FrameAction: Allocated buffer is not ready yet. I will wait for it");
-            }
-            wait();
-            if (DEBUG) {
-                LOGGER.info("FrameAction: Awoken Up");
-            }
-        }
-        ByteBuffer temp = allocated;
-        allocated = null;
-        return temp;
-    }
-
-    public void setFrame(ByteBuffer frame) {
-        this.frame = frame;
-    }
-
-    public int getSize() {
-        return frame.capacity();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
deleted file mode 100644
index e5543d6..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.external.feed.dataflow.FrameAction;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-public class ConcurrentFramePool {
-    private static final boolean DEBUG = false;
-    private static final String ERROR_INVALID_FRAME_SIZE =
-            "The size should be an integral multiple of the default frame size";
-    private static final String ERROR_LARGER_THAN_BUDGET_REQUEST =
-            "The requested frame size must not be greater than the allocated budget";
-    private static final Logger LOGGER = Logger.getLogger(ConcurrentFramePool.class.getName());
-    private final String nodeId;
-    private final int budget;
-    private final int defaultFrameSize;
-    private final ArrayDeque<ByteBuffer> pool;
-    private final ArrayDeque<FrameAction> subscribers = new ArrayDeque<>();
-    private final Map<Integer, ArrayDeque<ByteBuffer>> largeFramesPools;
-    private int handedOut;
-    private int created;
-
-    public ConcurrentFramePool(String nodeId, long budgetInBytes, int frameSize) {
-        this.nodeId = nodeId;
-        this.defaultFrameSize = frameSize;
-        this.budget = (int) (budgetInBytes / frameSize);
-        this.pool = new ArrayDeque<>(budget);
-        this.largeFramesPools = new HashMap<>();
-    }
-
-    public int getMaxFrameSize() {
-        return budget * defaultFrameSize;
-    }
-
-    public synchronized ByteBuffer get() {
-        // Subscribers have higher priority
-        if (subscribers.isEmpty()) {
-            return doGet();
-        }
-        if (DEBUG) {
-            LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
-                    + subscribers.size());
-        }
-        return null;
-    }
-
-    private ByteBuffer doGet() {
-        if (handedOut < budget) {
-            handedOut++;
-            return allocate();
-        }
-        if (DEBUG) {
-            LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
-                    + ", Requested = 1");
-        }
-        return null;
-    }
-
-    public int remaining() {
-        return budget - handedOut;
-    }
-
-    private ByteBuffer doGet(int bufferSize) throws HyracksDataException {
-        // Subscribers have higher priority
-        if (bufferSize % defaultFrameSize != 0) {
-            throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE);
-        }
-        int multiplier = bufferSize / defaultFrameSize;
-        if (multiplier > budget) {
-            throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
-        }
-        if (handedOut + multiplier <= budget) {
-            handedOut += multiplier;
-            ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiplier);
-            if (largeFramesPool == null || largeFramesPool.isEmpty()) {
-                if (created + multiplier > budget) {
-                    freeup(multiplier);
-                }
-                created += multiplier;
-                return ByteBuffer.allocate(bufferSize);
-            }
-            ByteBuffer buffer = largeFramesPool.poll();
-            buffer.clear();
-            return buffer;
-        }
-        // Not enough budget
-        if (DEBUG) {
-            LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
-                    + ", Requested = " + multiplier);
-        }
-        return null;
-    }
-
-    public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
-        if (subscribers.isEmpty()) {
-            return doGet(bufferSize);
-        }
-        if (DEBUG) {
-            LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
-                    + subscribers.size());
-        }
-        return null;
-    }
-
-    private int freeup(int desiredNumberOfFreePages) {
-        int needToFree = desiredNumberOfFreePages - (budget - created);
-        int freed = 0;
-        // start by large frames
-        for (Iterator<Entry<Integer, ArrayDeque<ByteBuffer>>> it = largeFramesPools.entrySet().iterator(); it
-                .hasNext();) {
-            Entry<Integer, ArrayDeque<ByteBuffer>> entry = it.next();
-            if (entry.getKey() != desiredNumberOfFreePages) {
-                while (!entry.getValue().isEmpty()) {
-                    entry.getValue().pop();
-                    freed += entry.getKey();
-                    if (freed >= needToFree) {
-                        // created is handled here
-                        created -= freed;
-                        return freed;
-                    }
-                }
-                it.remove();
-            }
-        }
-        // freed all large pages. need to free small pages as well
-        needToFree -= freed;
-        while (needToFree > 0) {
-            pool.pop();
-            needToFree--;
-            freed++;
-        }
-        created -= freed;
-        return freed;
-    }
-
-    private ByteBuffer allocate() {
-        if (pool.isEmpty()) {
-            if (created == budget) {
-                freeup(1);
-            }
-            created++;
-            return ByteBuffer.allocate(defaultFrameSize);
-        } else {
-            ByteBuffer buffer = pool.pop();
-            buffer.clear();
-            return buffer;
-        }
-    }
-
-    public synchronized boolean get(Collection<ByteBuffer> buffers, int count) {
-        if (handedOut + count <= budget) {
-            handedOut += count;
-            for (int i = 0; i < count; i++) {
-                buffers.add(allocate());
-            }
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public String toString() {
-        return "ConcurrentFramePool  [" + nodeId + "]" + "(consumed:" + handedOut + "/" + budget + ")";
-    }
-
-    public synchronized void release(Collection<ByteBuffer> buffers) throws HyracksDataException {
-        for (ByteBuffer buffer : buffers) {
-            release(buffer);
-        }
-    }
-
-    public synchronized void release(ByteBuffer buffer) throws HyracksDataException {
-        int multiples = buffer.capacity() / defaultFrameSize;
-        handedOut -= multiples;
-        if (DEBUG) {
-            LOGGER.info("Releasing " + multiples + " frames. Remaining frames = " + remaining());
-        }
-        if (multiples == 1) {
-            pool.add(buffer);
-        } else {
-            ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiples);
-            if (largeFramesPool == null) {
-                largeFramesPool = new ArrayDeque<>();
-                largeFramesPools.put(multiples, largeFramesPool);
-            }
-            largeFramesPool.push(buffer);
-        }
-        // check subscribers
-        while (!subscribers.isEmpty()) {
-            FrameAction frameAction = subscribers.peek();
-            ByteBuffer freeBuffer;
-            // check if we have enough and answer immediately.
-            if (frameAction.getSize() == defaultFrameSize) {
-                if (DEBUG) {
-                    LOGGER.info("Attempting to callback a subscriber that requested 1 frame");
-                }
-                freeBuffer = doGet();
-            } else {
-                if (DEBUG) {
-                    LOGGER.info("Attempting to callback a subscriber that requested "
-                            + frameAction.getSize() / defaultFrameSize + " frames");
-                }
-                freeBuffer = doGet(frameAction.getSize());
-            }
-            if (freeBuffer != null) {
-                int handedOutBeforeCall = handedOut;
-                try {
-                    frameAction.call(freeBuffer);
-                } catch (Exception e) {
-                    LOGGER.error("Error while attempting to answer a subscription. Buffer will be reclaimed", e);
-                    // TODO(amoudi): Add test cases and get rid of recursion
-                    if (handedOut == handedOutBeforeCall) {
-                        release(freeBuffer);
-                    }
-                    throw e;
-                } finally {
-                    subscribers.remove();
-                    if (DEBUG) {
-                        LOGGER.info(
-                                "A subscription has been satisfied. " + subscribers.size() + " remaining subscribers");
-                    }
-                }
-            } else {
-                if (DEBUG) {
-                    LOGGER.info("Failed to allocate requested frames");
-                }
-                break;
-            }
-        }
-        if (DEBUG) {
-            LOGGER.info(subscribers.size() + " remaining subscribers");
-        }
-    }
-
-    public synchronized boolean subscribe(FrameAction frameAction) throws HyracksDataException {
-        // check if subscribers are empty?
-        if (subscribers.isEmpty()) {
-            ByteBuffer buffer;
-            // check if we have enough and answer immediately.
-            if (frameAction.getSize() == defaultFrameSize) {
-                buffer = doGet();
-            } else {
-                buffer = doGet(frameAction.getSize());
-            }
-            if (buffer != null) {
-                frameAction.call(buffer);
-                // There is no need to subscribe. perform action and return false
-                return false;
-            }
-        } else {
-            int multiplier = frameAction.getSize() / defaultFrameSize;
-            if (multiplier > budget) {
-                throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
-            }
-        }
-        // none of the above, add to subscribers and return true
-        subscribers.add(frameAction);
-        return true;
-    }
-
-    /*
-     * For unit testing purposes
-     */
-    public Collection<FrameAction> getSubscribers() {
-        return subscribers;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
deleted file mode 100644
index 9f861d4..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedCollectInfo.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedCollectInfo extends FeedInfo {
-    public FeedId sourceFeedId;
-    public FeedConnectionId feedConnectionId;
-    public List<String> collectLocations = new ArrayList<String>();
-    public List<String> computeLocations = new ArrayList<String>();
-    public List<String> storageLocations = new ArrayList<String>();
-    public Map<String, String> feedPolicy;
-    public String superFeedManagerHost;
-    public int superFeedManagerPort;
-    public boolean fullyConnected;
-
-    public FeedCollectInfo(FeedId sourceFeedId, FeedConnectionId feedConnectionId, JobSpecification jobSpec,
-            JobId jobId, Map<String, String> feedPolicy) {
-        super(jobSpec, jobId, FeedInfoType.COLLECT);
-        this.sourceFeedId = sourceFeedId;
-        this.feedConnectionId = feedConnectionId;
-        this.feedPolicy = feedPolicy;
-        this.fullyConnected = true;
-    }
-
-    @Override
-    public String toString() {
-        return FeedInfoType.COLLECT + "[" + feedConnectionId + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
index 13f19b8..e2ab823 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
@@ -20,6 +20,8 @@ package org.apache.asterix.external.feed.management;
 
 import java.io.Serializable;
 
+import org.apache.asterix.active.EntityId;
+
 /**
  * A unique identifier for a feed connection. A feed connection is an instance of a data feed that is flowing into a
  * dataset.
@@ -27,22 +29,23 @@ import java.io.Serializable;
 public class FeedConnectionId implements Serializable {
 
     private static final long serialVersionUID = 1L;
+    public static final String FEED_EXTENSION_NAME = "Feed";
 
-    private final FeedId feedId;            // Dataverse - Feed
-    private final String datasetName;       // Dataset <Dataset is empty in case of no target dataset>
+    private final EntityId feedId; // Dataverse - Feed
+    private final String datasetName; // Dataset <Dataset is empty in case of no target dataset>
     private final int hash;
 
-    public FeedConnectionId(FeedId feedId, String datasetName) {
+    public FeedConnectionId(EntityId feedId, String datasetName) {
         this.feedId = feedId;
         this.datasetName = datasetName;
         this.hash = toString().hashCode();
     }
 
     public FeedConnectionId(String dataverse, String feedName, String datasetName) {
-        this(new FeedId(dataverse, feedName), datasetName);
+        this(new EntityId(FEED_EXTENSION_NAME, dataverse, feedName), datasetName);
     }
 
-    public FeedId getFeedId() {
+    public EntityId getFeedId() {
         return feedId;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
deleted file mode 100644
index a746ef4..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedConnectionManager;
-import org.apache.asterix.external.feed.runtime.FeedRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-
-/**
- * An implementation of the IFeedManager interface.
- * Provider necessary central repository for registering/retrieving
- * artifacts/services associated with a feed.
- */
-public class FeedConnectionManager implements IFeedConnectionManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedConnectionManager.class.getName());
-
-    private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
-    private final String nodeId;
-
-    public FeedConnectionManager(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    @Override
-    public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
-        return feedRuntimeManagers.get(feedId);
-    }
-
-    @Override
-    public void deregisterFeed(FeedConnectionId feedId) {
-        try {
-            FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
-            if (mgr != null) {
-                mgr.close();
-                feedRuntimeManagers.remove(feedId);
-            }
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Exception in closing feed runtime" + e.getMessage());
-            }
-        }
-
-    }
-
-    @Override
-    public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime) {
-        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
-        if (runtimeMgr == null) {
-            runtimeMgr = new FeedRuntimeManager(connectionId, this);
-            feedRuntimeManagers.put(connectionId, runtimeMgr);
-        }
-        runtimeMgr.registerFeedRuntime(feedRuntime.getRuntimeId(), feedRuntime);
-    }
-
-    @Override
-    public void deRegisterFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
-        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
-        if (runtimeMgr != null) {
-            runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
-        }
-    }
-
-    @Override
-    public FeedRuntime getFeedRuntime(FeedConnectionId connectionId, FeedRuntimeId feedRuntimeId) {
-        FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
-        return runtimeMgr != null ? runtimeMgr.getFeedRuntime(feedRuntimeId) : null;
-    }
-
-    @Override
-    public String toString() {
-        return "FeedManager " + "[" + nodeId + "]";
-    }
-
-    @Override
-    public List<FeedRuntimeId> getRegisteredRuntimes() {
-        List<FeedRuntimeId> runtimes = new ArrayList<FeedRuntimeId>();
-        for (Entry<FeedConnectionId, FeedRuntimeManager> entry : feedRuntimeManagers.entrySet()) {
-            runtimes.addAll(entry.getValue().getFeedRuntimes());
-        }
-        return runtimes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionRequest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionRequest.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionRequest.java
index b1d3300..1106160 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionRequest.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionRequest.java
@@ -18,16 +18,20 @@
  */
 package org.apache.asterix.external.feed.management;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.commons.lang3.StringUtils;
 
 /**
  * A request for connecting a feed to a dataset.
  */
-public class FeedConnectionRequest {
+public class FeedConnectionRequest implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
     public enum ConnectionStatus {
         /** initial state upon creating a connection request **/
@@ -64,11 +68,11 @@ public class FeedConnectionRequest {
     /** Target dataset associated with the connection request **/
     private final String targetDataset;
 
-    private final FeedId receivingFeedId;
+    private final EntityId receivingFeedId;
 
     public FeedConnectionRequest(FeedJointKey feedPointKey, FeedRuntimeType connectionLocation,
             List<String> functionsToApply, String targetDataset, String policy, Map<String, String> policyParameters,
-            FeedId receivingFeedId) {
+            EntityId receivingFeedId) {
         this.feedJointKey = feedPointKey;
         this.connectionLocation = connectionLocation;
         this.functionsToApply = functionsToApply;
@@ -103,7 +107,7 @@ public class FeedConnectionRequest {
         return connectionLocation;
     }
 
-    public FeedId getReceivingFeedId() {
+    public EntityId getReceivingFeedId() {
         return receivingFeedId;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
new file mode 100644
index 0000000..40d2500
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -0,0 +1,644 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.external.feed.api.FeedOperationCounter;
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.api.IFeedJoint.State;
+import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
+import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
+import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
+import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
+import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
+import org.apache.asterix.external.util.FeedUtils.JobType;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
+import org.apache.log4j.Logger;
+
+public class FeedEventsListener implements IActiveEntityEventsListener {
+    private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class);
+    private final Map<EntityId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline;
+    private final List<IFeedLifecycleEventSubscriber> subscribers;
+    private final Map<Long, ActiveJob> jobs;
+    private final Map<Long, ActiveJob> intakeJobs;
+    private final Map<EntityId, FeedIntakeInfo> entity2Intake;
+    private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
+    private EntityId entityId;
+    private IFeedJoint sourceFeedJoint;
+
+    public FeedEventsListener(EntityId entityId) {
+        this.entityId = entityId;
+        subscribers = new ArrayList<>();
+        jobs = new HashMap<>();
+        feedPipeline = new HashMap<>();
+        entity2Intake = new HashMap<>();
+        connectJobInfos = new HashMap<>();
+        intakeJobs = new HashMap<>();
+    }
+
+    @Override
+    public void notify(ActiveEvent event) {
+        try {
+            switch (event.getEventKind()) {
+                case JOB_START:
+                    handleJobStartEvent(event);
+                    break;
+                case JOB_FINISH:
+                    handleJobFinishEvent(event);
+                    break;
+                case PARTITION_EVENT:
+                    handlePartitionStart(event);
+                    break;
+                default:
+                    LOGGER.warn("Unknown Feed Event" + event);
+                    break;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Unhandled Exception", e);
+        }
+    }
+
+    private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
+        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+        JobType jobType = (JobType) jobInfo.getJobObject();
+        switch (jobType) {
+            case INTAKE:
+                handleIntakeJobStartMessage((FeedIntakeInfo) jobInfo);
+                break;
+            case FEED_CONNECT:
+                handleCollectJobStartMessage((FeedConnectJobInfo) jobInfo);
+                break;
+            default:
+        }
+    }
+
+    private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
+        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+        JobType jobType = (JobType) jobInfo.getJobObject();
+        switch (jobType) {
+            case FEED_CONNECT:
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("Collect Job finished for  " + jobInfo);
+                }
+                handleFeedCollectJobFinishMessage((FeedConnectJobInfo) jobInfo);
+                break;
+            case INTAKE:
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("Intake Job finished for feed intake " + jobInfo.getJobId());
+                }
+                handleFeedIntakeJobFinishMessage((FeedIntakeInfo) jobInfo, message);
+                break;
+            default:
+                break;
+        }
+    }
+
+    private synchronized void handlePartitionStart(ActiveEvent message) {
+        ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+        JobType jobType = (JobType) jobInfo.getJobObject();
+        switch (jobType) {
+            case FEED_CONNECT:
+                ((FeedConnectJobInfo) jobInfo).partitionStart();
+                if (((FeedConnectJobInfo) jobInfo).collectionStarted()) {
+                    notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_COLLECT_STARTED);
+                }
+                break;
+            case INTAKE:
+                handleIntakePartitionStarts(message, jobInfo);
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    private void handleIntakePartitionStarts(ActiveEvent message, ActiveJob jobInfo) {
+        if (feedPipeline.get(message.getFeedId()).first.decrementAndGet() == 0) {
+            ((FeedIntakeInfo) jobInfo).getIntakeFeedJoint().setState(State.ACTIVE);
+            jobInfo.setState(ActivityState.ACTIVE);
+            notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+        }
+    }
+
+    public synchronized void registerFeedJoint(IFeedJoint feedJoint, int numOfPrividers) {
+        Pair<FeedOperationCounter, List<IFeedJoint>> feedJointsOnPipeline =
+                feedPipeline.get(feedJoint.getOwnerFeedId());
+        if (feedJointsOnPipeline == null) {
+            feedJointsOnPipeline = new Pair<>(new FeedOperationCounter(numOfPrividers), new ArrayList<IFeedJoint>());
+            feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
+            feedJointsOnPipeline.second.add(feedJoint);
+        } else {
+            if (!feedJointsOnPipeline.second.contains(feedJoint)) {
+                feedJointsOnPipeline.second.add(feedJoint);
+            } else {
+                throw new IllegalArgumentException("Feed joint " + feedJoint + " already registered");
+            }
+        }
+    }
+
+    public synchronized void deregisterFeedIntakeJob(JobId jobId) {
+        FeedIntakeInfo info = (FeedIntakeInfo) intakeJobs.remove(jobId.getId());
+        jobs.remove(jobId.getId());
+        entity2Intake.remove(info.getFeedId());
+        List<IFeedJoint> joints = feedPipeline.get(info.getFeedId()).second;
+        joints.remove(info.getIntakeFeedJoint());
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Deregistered feed intake job [" + jobId + "]");
+        }
+    }
+
+    private static synchronized void handleIntakeJobStartMessage(FeedIntakeInfo intakeJobInfo) throws Exception {
+        List<OperatorDescriptorId> intakeOperatorIds = new ArrayList<>();
+        Map<OperatorDescriptorId, IOperatorDescriptor> operators = intakeJobInfo.getSpec().getOperatorMap();
+        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+            IOperatorDescriptor opDesc = entry.getValue();
+            if (opDesc instanceof FeedIntakeOperatorDescriptor) {
+                intakeOperatorIds.add(opDesc.getOperatorId());
+            }
+        }
+
+        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        JobInfo info = hcc.getJobInfo(intakeJobInfo.getJobId());
+        List<String> intakeLocations = new ArrayList<>();
+        for (OperatorDescriptorId intakeOperatorId : intakeOperatorIds) {
+            Map<Integer, String> operatorLocations = info.getOperatorLocations().get(intakeOperatorId);
+            int nOperatorInstances = operatorLocations.size();
+            for (int i = 0; i < nOperatorInstances; i++) {
+                intakeLocations.add(operatorLocations.get(i));
+            }
+        }
+        // intakeLocations is an ordered list; 
+        // element at position i corresponds to location of i'th instance of operator
+        intakeJobInfo.setIntakeLocation(intakeLocations);
+    }
+
+    public IFeedJoint getSourceFeedJoint(FeedConnectionId connectionId) {
+        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+        if (cInfo != null) {
+            return cInfo.getSourceFeedJoint();
+        }
+        return null;
+    }
+
+    public synchronized void registerFeedIntakeJob(EntityId feedId, JobId jobId, JobSpecification jobSpec)
+            throws HyracksDataException {
+        if (entity2Intake.get(feedId) != null) {
+            throw new IllegalStateException("Feed already has an intake job");
+        }
+        if (intakeJobs.get(jobId.getId()) != null) {
+            throw new IllegalStateException("Feed job already registered in intake jobs");
+        }
+        if (jobs.get(jobId.getId()) != null) {
+            throw new IllegalStateException("Feed job already registered in all jobs");
+        }
+
+        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId);
+        sourceFeedJoint = null;
+        for (IFeedJoint joint : pair.second) {
+            if (joint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) {
+                sourceFeedJoint = joint;
+                break;
+            }
+        }
+
+        if (sourceFeedJoint != null) {
+            FeedIntakeInfo intakeJobInfo =
+                    new FeedIntakeInfo(jobId, ActivityState.CREATED, feedId, sourceFeedJoint, jobSpec);
+            pair.first.setFeedJobInfo(intakeJobInfo);
+            entity2Intake.put(feedId, intakeJobInfo);
+            jobs.put(jobId.getId(), intakeJobInfo);
+            intakeJobs.put(jobId.getId(), intakeJobInfo);
+
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("Registered feed intake [" + jobId + "]" + " for feed " + feedId);
+            }
+        } else {
+            throw new HyracksDataException(
+                    "Could not register feed intake job [" + jobId + "]" + " for feed  " + feedId);
+        }
+    }
+
+    public synchronized void registerFeedCollectionJob(EntityId sourceFeedId, FeedConnectionId connectionId,
+            JobId jobId, JobSpecification jobSpec, Map<String, String> feedPolicy) {
+        if (jobs.get(jobId.getId()) != null) {
+            throw new IllegalStateException("Feed job already registered");
+        }
+        if (connectJobInfos.containsKey(jobId.getId())) {
+            throw new IllegalStateException("Feed job already registered");
+        }
+
+        List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId).second;
+        FeedConnectionId cid = null;
+        IFeedJoint collectionSourceFeedJoint = null;
+        for (IFeedJoint joint : feedJoints) {
+            cid = joint.getReceiver(connectionId);
+            if (cid != null) {
+                collectionSourceFeedJoint = joint;
+                break;
+            }
+        }
+
+        if (cid != null) {
+            FeedConnectJobInfo cInfo = new FeedConnectJobInfo(sourceFeedId, jobId, ActivityState.CREATED, connectionId,
+                    collectionSourceFeedJoint, null, jobSpec, feedPolicy);
+            jobs.put(jobId.getId(), cInfo);
+            connectJobInfos.put(connectionId, cInfo);
+
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("Registered feed connection [" + jobId + "]" + " for feed " + connectionId);
+            }
+        } else {
+            LOGGER.warn(
+                    "Could not register feed collection job [" + jobId + "]" + " for feed connection " + connectionId);
+        }
+    }
+
+    @Override
+    public void notifyJobCreation(JobId jobId, JobSpecification spec) {
+        FeedConnectionId feedConnectionId = null;
+        Map<String, String> feedPolicy = null;
+        try {
+            for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
+                if (opDesc instanceof FeedCollectOperatorDescriptor) {
+                    feedConnectionId = ((FeedCollectOperatorDescriptor) opDesc).getFeedConnectionId();
+                    feedPolicy = ((FeedCollectOperatorDescriptor) opDesc).getFeedPolicyProperties();
+                    registerFeedCollectionJob(((FeedCollectOperatorDescriptor) opDesc).getSourceFeedId(),
+                            feedConnectionId, jobId, spec, feedPolicy);
+                    return;
+                } else if (opDesc instanceof FeedIntakeOperatorDescriptor) {
+                    registerFeedIntakeJob(((FeedIntakeOperatorDescriptor) opDesc).getFeedId(), jobId, spec);
+                    return;
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error(e);
+        }
+    }
+
+    public synchronized List<String> getConnectionLocations(IFeedJoint feedJoint, final FeedConnectionRequest request)
+            throws Exception {
+        List<String> locations = null;
+        switch (feedJoint.getType()) {
+            case COMPUTE:
+                FeedConnectionId connectionId = feedJoint.getProvider();
+                FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+                locations = cInfo.getComputeLocations();
+                break;
+            case INTAKE:
+                FeedIntakeInfo intakeInfo = entity2Intake.get(feedJoint.getOwnerFeedId());
+                locations = intakeInfo.getIntakeLocation();
+                break;
+            default:
+                break;
+        }
+        return locations;
+    }
+
+    private void notifyFeedEventSubscribers(FeedLifecycleEvent event) {
+        if (subscribers != null && !subscribers.isEmpty()) {
+            for (IFeedLifecycleEventSubscriber subscriber : subscribers) {
+                subscriber.handleFeedEvent(event);
+            }
+        }
+    }
+
+    private synchronized void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, ActiveEvent message)
+            throws Exception {
+        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        JobInfo info = hcc.getJobInfo(message.getJobId());
+        JobStatus status = info.getStatus();
+        EntityId feedId = intakeInfo.getFeedId();
+        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId);
+        if (status.equals(JobStatus.FAILURE)) {
+            pair.first.setFailedIngestion(true);
+        }
+        // remove feed joints
+        deregisterFeedIntakeJob(message.getJobId());
+        // notify event listeners
+        feedPipeline.remove(feedId);
+        entity2Intake.remove(feedId);
+        notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
+                : FeedLifecycleEvent.FEED_INTAKE_ENDED);
+    }
+
+    private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
+        FeedConnectionId connectionId = cInfo.getConnectionId();
+
+        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+        JobStatus status = info.getStatus();
+        boolean failure = status != null && status.equals(JobStatus.FAILURE);
+        FeedPolicyAccessor fpa = new FeedPolicyAccessor(cInfo.getFeedPolicy());
+        boolean retainSubsription =
+                cInfo.getState().equals(ActivityState.UNDER_RECOVERY) || (failure && fpa.continueOnHardwareFailure());
+
+        if (!retainSubsription) {
+            IFeedJoint feedJoint = cInfo.getSourceFeedJoint();
+            feedJoint.removeReceiver(connectionId);
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info(
+                        "Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
+            }
+        }
+
+        connectJobInfos.remove(connectionId);
+        jobs.remove(cInfo.getJobId().getId());
+        // notify event listeners
+        FeedLifecycleEvent event =
+                failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_COLLECT_ENDED;
+        notifyFeedEventSubscribers(event);
+    }
+
+    public List<String> getFeedStorageLocations(FeedConnectionId connectionId) {
+        return connectJobInfos.get(connectionId).getStorageLocations();
+    }
+
+    public List<String> getFeedCollectLocations(FeedConnectionId connectionId) {
+        return connectJobInfos.get(connectionId).getCollectLocations();
+    }
+
+    public List<String> getFeedIntakeLocations(EntityId feedId) {
+        return entity2Intake.get(feedId).getIntakeLocation();
+    }
+
+    public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
+        return connectJobInfos.get(connectionId).getJobId();
+    }
+
+    public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
+        List<IFeedJoint> joints = feedPipeline.containsKey(feedJointKey.getFeedId())
+                ? feedPipeline.get(feedJointKey.getFeedId()).second : null;
+        if (joints != null && !joints.isEmpty()) {
+            for (IFeedJoint joint : joints) {
+                if (joint.getFeedJointKey().equals(feedJointKey)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    public Collection<IFeedJoint> getFeedIntakeJoints() {
+        List<IFeedJoint> intakeFeedPoints = new ArrayList<>();
+        for (FeedIntakeInfo info : entity2Intake.values()) {
+            intakeFeedPoints.add(info.getIntakeFeedJoint());
+        }
+        return intakeFeedPoints;
+    }
+
+    public IFeedJoint getFeedJoint(FeedJointKey feedPointKey) {
+        List<IFeedJoint> joints = feedPipeline.containsKey(feedPointKey.getFeedId())
+                ? feedPipeline.get(feedPointKey.getFeedId()).second : null;
+        if (joints != null && !joints.isEmpty()) {
+            for (IFeedJoint joint : joints) {
+                if (joint.getFeedJointKey().equals(feedPointKey)) {
+                    return joint;
+                }
+            }
+        }
+        return null;
+    }
+
+    public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
+        IFeedJoint feedJoint = getFeedJoint(feedJointKey);
+        if (feedJoint != null) {
+            return feedJoint;
+        } else {
+            String jointKeyString = feedJointKey.getStringRep();
+            List<IFeedJoint> jointsOnPipeline = feedPipeline.containsKey(feedJointKey.getFeedId())
+                    ? feedPipeline.get(feedJointKey.getFeedId()).second : null;
+            IFeedJoint candidateJoint = null;
+            if (jointsOnPipeline != null) {
+                for (IFeedJoint joint : jointsOnPipeline) {
+                    if (jointKeyString.contains(joint.getFeedJointKey().getStringRep()) && (candidateJoint == null
+                            || /*found feed point is a super set of the earlier find*/joint.getFeedJointKey()
+                                    .getStringRep().contains(candidateJoint.getFeedJointKey().getStringRep()))) {
+                        candidateJoint = joint;
+                    }
+                }
+            }
+            return candidateJoint;
+        }
+    }
+
+    public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
+        return connectJobInfos.get(connectionId).getSpec();
+    }
+
+    public IFeedJoint getFeedPoint(EntityId sourceFeedId, IFeedJoint.FeedJointType type) {
+        List<IFeedJoint> joints = feedPipeline.get(sourceFeedId).second;
+        for (IFeedJoint joint : joints) {
+            if (joint.getType().equals(type)) {
+                return joint;
+            }
+        }
+        return null;
+    }
+
+    private void setLocations(FeedConnectJobInfo cInfo) {
+        JobSpecification jobSpec = cInfo.getSpec();
+
+        List<OperatorDescriptorId> collectOperatorIds = new ArrayList<>();
+        List<OperatorDescriptorId> computeOperatorIds = new ArrayList<>();
+        List<OperatorDescriptorId> storageOperatorIds = new ArrayList<>();
+
+        Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
+        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+            IOperatorDescriptor opDesc = entry.getValue();
+            IOperatorDescriptor actualOp;
+            if (opDesc instanceof FeedMetaOperatorDescriptor) {
+                actualOp = ((FeedMetaOperatorDescriptor) opDesc).getCoreOperator();
+            } else {
+                actualOp = opDesc;
+            }
+
+            if (actualOp instanceof AlgebricksMetaOperatorDescriptor) {
+                AlgebricksMetaOperatorDescriptor op = (AlgebricksMetaOperatorDescriptor) actualOp;
+                IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories();
+                boolean computeOp = false;
+                for (IPushRuntimeFactory rf : runtimeFactories) {
+                    if (rf instanceof AssignRuntimeFactory) {
+                        IConnectorDescriptor connDesc = jobSpec.getOperatorInputMap().get(op.getOperatorId()).get(0);
+                        IOperatorDescriptor sourceOp =
+                                jobSpec.getConnectorOperatorMap().get(connDesc.getConnectorId()).getLeft().getLeft();
+                        if (sourceOp instanceof FeedCollectOperatorDescriptor) {
+                            computeOp = true;
+                            break;
+                        }
+                    }
+                }
+                if (computeOp) {
+                    computeOperatorIds.add(entry.getKey());
+                }
+            } else if (actualOp instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
+                storageOperatorIds.add(entry.getKey());
+            } else if (actualOp instanceof FeedCollectOperatorDescriptor) {
+                collectOperatorIds.add(entry.getKey());
+            }
+        }
+
+        try {
+            IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+            JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+            List<String> collectLocations = new ArrayList<>();
+            for (OperatorDescriptorId collectOpId : collectOperatorIds) {
+                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(collectOpId);
+                int nOperatorInstances = operatorLocations.size();
+                for (int i = 0; i < nOperatorInstances; i++) {
+                    collectLocations.add(operatorLocations.get(i));
+                }
+            }
+
+            List<String> computeLocations = new ArrayList<>();
+            for (OperatorDescriptorId computeOpId : computeOperatorIds) {
+                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(computeOpId);
+                if (operatorLocations != null) {
+                    int nOperatorInstances = operatorLocations.size();
+                    for (int i = 0; i < nOperatorInstances; i++) {
+                        computeLocations.add(operatorLocations.get(i));
+                    }
+                } else {
+                    computeLocations.clear();
+                    computeLocations.addAll(collectLocations);
+                }
+            }
+
+            List<String> storageLocations = new ArrayList<>();
+            for (OperatorDescriptorId storageOpId : storageOperatorIds) {
+                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(storageOpId);
+                if (operatorLocations == null) {
+                    continue;
+                }
+                int nOperatorInstances = operatorLocations.size();
+                for (int i = 0; i < nOperatorInstances; i++) {
+                    storageLocations.add(operatorLocations.get(i));
+                }
+            }
+            cInfo.setCollectLocations(collectLocations);
+            cInfo.setComputeLocations(computeLocations);
+            cInfo.setStorageLocations(storageLocations);
+
+        } catch (Exception e) {
+            LOGGER.error("Error while setting feed active locations", e);
+        }
+
+    }
+
+    public synchronized void registerFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) {
+        subscribers.add(subscriber);
+    }
+
+    public void deregisterFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) {
+        subscribers.remove(subscriber);
+    }
+
+    public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId,
+            IFeedLifecycleEventSubscriber eventSubscriber) {
+        boolean active = false;
+        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
+        if (cInfo != null) {
+            active = cInfo.getState().equals(ActivityState.ACTIVE);
+        }
+        if (active) {
+            registerFeedEventSubscriber(eventSubscriber);
+        }
+        return active;
+    }
+
+    public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) {
+        return connectJobInfos.get(connectionId);
+    }
+
+    private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws ACIDException {
+        // set locations of feed sub-operations (intake, compute, store)
+        setLocations(cInfo);
+        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(cInfo.getConnectionId().getFeedId());
+        // activate joints
+        List<IFeedJoint> joints = pair.second;
+        for (IFeedJoint joint : joints) {
+            if (joint.getProvider().equals(cInfo.getConnectionId())) {
+                joint.setState(State.ACTIVE);
+                if (joint.getType().equals(IFeedJoint.FeedJointType.COMPUTE)) {
+                    cInfo.setComputeFeedJoint(joint);
+                }
+            }
+        }
+        cInfo.setState(ActivityState.ACTIVE);
+    }
+
+    public synchronized boolean isConnectedToDataset(String datasetName) {
+        for (FeedConnectionId connection : connectJobInfos.keySet()) {
+            if (connection.getDatasetName().equals(datasetName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public FeedConnectionId[] getConnections() {
+        return connectJobInfos.keySet().toArray(new FeedConnectionId[connectJobInfos.size()]);
+    }
+
+    public boolean isFeedJointAvailable(FeedJointKey feedJointKey) {
+        return isFeedPointAvailable(feedJointKey);
+    }
+
+    @Override
+    public boolean isEntityActive() {
+        return !jobs.isEmpty();
+    }
+
+    @Override
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    public IFeedJoint getSourceFeedJoint() {
+        return sourceFeedJoint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedId.java
deleted file mode 100644
index 3145d72..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedId.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.io.Serializable;
-
-/**
- * A unique identifier for a data feed.
- */
-public class FeedId implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private final String dataverse;
-    private final String feedName;
-
-    public FeedId(String dataverse, String feedName) {
-        this.dataverse = dataverse;
-        this.feedName = feedName;
-    }
-
-    public String getDataverse() {
-        return dataverse;
-    }
-
-    public String getFeedName() {
-        return feedName;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || !(o instanceof FeedId)) {
-            return false;
-        }
-        if (this == o || ((FeedId) o).getFeedName().equals(feedName) && ((FeedId) o).getDataverse().equals(dataverse)) {
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return dataverse + "." + feedName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java
index edbaf7c..2905bb2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedJointKey.java
@@ -18,8 +18,10 @@
  */
 package org.apache.asterix.external.feed.management;
 
+import java.io.Serializable;
 import java.util.List;
 
+import org.apache.asterix.active.EntityId;
 import org.apache.commons.lang3.StringUtils;
 
 /**
@@ -28,13 +30,14 @@ import org.apache.commons.lang3.StringUtils;
  * constitute the feed. The feed joint acts as a network tap and allows the flowing data to be
  * routed to multiple paths.
  */
-public class FeedJointKey {
+public class FeedJointKey implements Serializable {
 
-    private final FeedId primaryFeedId;
+    private static final long serialVersionUID = 1L;
+    private final EntityId primaryFeedId;
     private final List<String> appliedFunctions;
     private final String stringRep;
 
-    public FeedJointKey(FeedId feedId, List<String> appliedFunctions) {
+    public FeedJointKey(EntityId feedId, List<String> appliedFunctions) {
         this.primaryFeedId = feedId;
         this.appliedFunctions = appliedFunctions;
         StringBuilder builder = new StringBuilder();
@@ -44,7 +47,7 @@ public class FeedJointKey {
         stringRep = builder.toString();
     }
 
-    public FeedId getFeedId() {
+    public EntityId getFeedId() {
         return primaryFeedId;
     }
 
@@ -68,8 +71,9 @@ public class FeedJointKey {
 
     @Override
     public boolean equals(Object o) {
-        if (this == o)
+        if (this == o) {
             return true;
+        }
         if (o == null || !(o instanceof FeedJointKey)) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
deleted file mode 100644
index 62f4c6c..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.api.IFeedConnectionManager;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * An implementation of the IFeedManager interface.
- * Provider necessary central repository for registering/retrieving
- * artifacts/services associated with a feed.
- */
-public class FeedManager {
-
-    private final Map<FeedRuntimeId, ISubscribableRuntime> subscribableRuntimes;
-
-    private final IFeedConnectionManager feedConnectionManager;
-
-    private final ConcurrentFramePool feedMemoryManager;
-
-    private final AsterixFeedProperties asterixFeedProperties;
-
-    private final String nodeId;
-
-    private final int frameSize;
-
-    public FeedManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize)
-            throws AsterixException, HyracksDataException {
-        this.nodeId = nodeId;
-        this.feedConnectionManager = new FeedConnectionManager(nodeId);
-        this.feedMemoryManager =
-                new ConcurrentFramePool(nodeId, feedProperties.getMemoryComponentGlobalBudget(), frameSize);
-        this.frameSize = frameSize;
-        this.asterixFeedProperties = feedProperties;
-        this.subscribableRuntimes = new ConcurrentHashMap<FeedRuntimeId, ISubscribableRuntime>();
-    }
-
-    public IFeedConnectionManager getFeedConnectionManager() {
-        return feedConnectionManager;
-    }
-
-    public ConcurrentFramePool getFeedMemoryManager() {
-        return feedMemoryManager;
-    }
-
-    public int getFrameSize() {
-        return frameSize;
-    }
-
-    public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime) {
-        FeedRuntimeId sid = subscribableRuntime.getRuntimeId();
-        if (!subscribableRuntimes.containsKey(sid)) {
-            subscribableRuntimes.put(sid, subscribableRuntime);
-        }
-    }
-
-    public void deregisterFeedSubscribableRuntime(FeedRuntimeId subscribableRuntimeId) {
-        subscribableRuntimes.remove(subscribableRuntimeId);
-    }
-
-    public ISubscribableRuntime getSubscribableRuntime(FeedRuntimeId subscribableRuntimeId) {
-        return subscribableRuntimes.get(subscribableRuntimeId);
-    }
-
-    @Override
-    public String toString() {
-        return "FeedManager " + "[" + nodeId + "]";
-    }
-
-    public AsterixFeedProperties getAsterixFeedProperties() {
-        return asterixFeedProperties;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedRuntimeManager.java
deleted file mode 100644
index f7e98f7..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedRuntimeManager.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedConnectionManager;
-import org.apache.asterix.external.feed.runtime.FeedRuntime;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-
-public class FeedRuntimeManager {
-
-    private static Logger LOGGER = Logger.getLogger(FeedRuntimeManager.class.getName());
-
-    private final FeedConnectionId connectionId;
-    private final IFeedConnectionManager connectionManager;
-    private final Map<FeedRuntimeId, FeedRuntime> feedRuntimes;
-
-    private final ExecutorService executorService;
-
-    public FeedRuntimeManager(FeedConnectionId connectionId, IFeedConnectionManager feedConnectionManager) {
-        this.connectionId = connectionId;
-        this.feedRuntimes = new ConcurrentHashMap<FeedRuntimeId, FeedRuntime>();
-        this.executorService = Executors.newCachedThreadPool();
-        this.connectionManager = feedConnectionManager;
-    }
-
-    public void close() throws IOException {
-        if (executorService != null) {
-            executorService.shutdownNow();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Shut down executor service for :" + connectionId);
-            }
-        }
-    }
-
-    public FeedRuntime getFeedRuntime(FeedRuntimeId runtimeId) {
-        return feedRuntimes.get(runtimeId);
-    }
-
-    public void registerFeedRuntime(FeedRuntimeId runtimeId, FeedRuntime feedRuntime) {
-        feedRuntimes.put(runtimeId, feedRuntime);
-    }
-
-    public synchronized void deregisterFeedRuntime(FeedRuntimeId runtimeId) {
-        feedRuntimes.remove(runtimeId);
-        if (feedRuntimes.isEmpty()) {
-            connectionManager.deregisterFeed(connectionId);
-        }
-    }
-
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public Set<FeedRuntimeId> getFeedRuntimes() {
-        return feedRuntimes.keySet();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java
index b0f7624..06aafdd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/EndFeedMessage.java
@@ -18,22 +18,24 @@
  */
 package org.apache.asterix.external.feed.message;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 /**
- * A feed control message indicating the need to end the feed. This message is dispatched
- * to all locations that host an operator involved in the feed pipeline.
+ * @deprecated A feed control message indicating the need to end the feed. This message is dispatched
+ *             to all locations that host an operator involved in the feed pipeline.
+ *             Instead, use IMessageBroker messages
  */
+@Deprecated
 public class EndFeedMessage extends FeedMessage {
 
     private static final long serialVersionUID = 1L;
 
-    private final FeedId sourceFeedId;
+    private final EntityId sourceFeedId;
 
     private final FeedConnectionId connectionId;
 
@@ -48,7 +50,7 @@ public class EndFeedMessage extends FeedMessage {
         DISCONTINUE_SOURCE
     }
 
-    public EndFeedMessage(FeedConnectionId connectionId, FeedRuntimeType sourceRuntimeType, FeedId sourceFeedId,
+    public EndFeedMessage(FeedConnectionId connectionId, FeedRuntimeType sourceRuntimeType, EntityId sourceFeedId,
             boolean completeDisconnection, EndMessageType endMessageType) {
         super(MessageType.END);
         this.connectionId = connectionId;
@@ -67,7 +69,7 @@ public class EndFeedMessage extends FeedMessage {
         return sourceRuntimeType;
     }
 
-    public FeedId getSourceFeedId() {
+    public EntityId getSourceFeedId() {
         return sourceFeedId;
     }
 
@@ -84,7 +86,7 @@ public class EndFeedMessage extends FeedMessage {
         JSONObject obj = new JSONObject();
         obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
         obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getEntityName());
         obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
         return obj;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java
index f2b354b..4f57fb5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessage.java
@@ -18,14 +18,14 @@
  */
 package org.apache.asterix.external.feed.message;
 
-import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.active.IActiveMessage;
 import org.apache.hyracks.api.dataflow.value.JSONSerializable;
 
 /**
  * A control message that can be sent to the runtime instance of a
  * feed's adapter.
  */
-public abstract class FeedMessage implements IFeedMessage, JSONSerializable {
+public abstract class FeedMessage implements IActiveMessage, JSONSerializable {
 
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
deleted file mode 100644
index 49b23ed..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.hyracks.api.job.JobId;
-
-public class FeedPartitionStartMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final FeedId feedId;
-    private final JobId jobId;
-
-    public FeedPartitionStartMessage(FeedId feedId, JobId jobId) {
-        this.feedId = feedId;
-        this.jobId = jobId;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.FEED_PROVIDER_READY;
-    }
-
-    public FeedId getFeedId() {
-        return feedId;
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/973a0d34/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index b604db5..f2f0747 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -22,9 +22,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
-import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.log4j.Logger;
 
 /**
@@ -34,7 +34,7 @@ public class AdapterRuntimeManager {
 
     private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
 
-    private final FeedId feedId;                    // (dataverse-feed)
+    private final EntityId feedId;                    // (dataverse-feed)
 
     private final FeedAdapter feedAdapter;         // The adapter
 
@@ -49,7 +49,7 @@ public class AdapterRuntimeManager {
     private volatile boolean done = false;
     private volatile boolean failed = false;
 
-    public AdapterRuntimeManager(FeedId feedId, FeedAdapter feedAdapter, DistributeFeedFrameWriter writer,
+    public AdapterRuntimeManager(EntityId feedId, FeedAdapter feedAdapter, DistributeFeedFrameWriter writer,
             int partition) {
         this.feedId = feedId;
         this.feedAdapter = feedAdapter;
@@ -86,7 +86,7 @@ public class AdapterRuntimeManager {
         }
     }
 
-    public FeedId getFeedId() {
+    public EntityId getFeedId() {
         return feedId;
     }