You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/04/08 18:36:23 UTC

[1/2] incubator-asterixdb git commit: Asterix-1389 Fix Deadlocks in Feed Connections

Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master db831a455 -> fd0147101


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
index 7b74ef9..0dd87d6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
@@ -22,29 +22,19 @@ import org.apache.asterix.external.feed.watch.FeedJobInfo;
 
 public class FeedOperationCounter {
     private FeedJobInfo feedJobInfo;
-    private int providersCount;
-    private int jobsCount;
+    private int partitionCount;
     private boolean failedIngestion = false;
 
-    public FeedOperationCounter(int providersCount, int jobsCount) {
-        this.providersCount = providersCount;
-        this.jobsCount = jobsCount;
+    public FeedOperationCounter(int partitionCount) {
+        this.partitionCount = partitionCount;
     }
 
-    public int getProvidersCount() {
-        return providersCount;
+    public int getPartitionCount() {
+        return partitionCount;
     }
 
-    public void setProvidersCount(int providersCount) {
-        this.providersCount = providersCount;
-    }
-
-    public int getJobsCount() {
-        return jobsCount;
-    }
-
-    public void setJobsCount(int jobsCount) {
-        this.jobsCount = jobsCount;
+    public void setPartitionCount(int partitionCount) {
+        this.partitionCount = partitionCount;
     }
 
     public boolean isFailedIngestion() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
index 0c8724e..ad3c1c9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
@@ -27,7 +27,8 @@ public interface IFeedLifecycleEventSubscriber {
         FEED_COLLECT_STARTED,
         FEED_INTAKE_FAILURE,
         FEED_COLLECT_FAILURE,
-        FEED_ENDED
+        FEED_INTAKE_ENDED,
+        FEED_COLLECT_ENDED
     }
 
     public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
index 28b713e..448ea47 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
@@ -45,6 +45,6 @@ public interface IFeedLifecycleListener extends IJobLifecycleListener, IClusterE
 
     public List<String> getCollectLocations(FeedConnectionId feedConnectionId);
 
-    boolean isFeedConnectionActive(FeedConnectionId connectionId);
+    boolean isFeedConnectionActive(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber eventSubscriber);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
new file mode 100644
index 0000000..49b23ed
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.job.JobId;
+
+public class FeedPartitionStartMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final FeedId feedId;
+    private final JobId jobId;
+
+    public FeedPartitionStartMessage(FeedId feedId, JobId jobId) {
+        this.feedId = feedId;
+        this.jobId = jobId;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.FEED_PROVIDER_READY;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
deleted file mode 100644
index 4c81c5b..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.hyracks.api.job.JobId;
-
-public class FeedProviderReadyMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final FeedId feedId;
-    private final JobId jobId;
-
-    public FeedProviderReadyMessage(FeedId feedId, JobId jobId) {
-        this.feedId = feedId;
-        this.jobId = jobId;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.FEED_PROVIDER_READY;
-    }
-
-    public FeedId getFeedId() {
-        return feedId;
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
index 3e42169..b69a7b3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
@@ -36,6 +36,7 @@ public class FeedConnectJobInfo extends FeedJobInfo {
     private List<String> collectLocations;
     private List<String> computeLocations;
     private List<String> storageLocations;
+    private int partitionStarts = 0;
 
     public FeedConnectJobInfo(JobId jobId, FeedJobState state, FeedConnectionId connectionId,
             IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
@@ -91,4 +92,12 @@ public class FeedConnectJobInfo extends FeedJobInfo {
         this.computeFeedJoint = computeFeedJoint;
     }
 
+    public void partitionStart() {
+        partitionStarts++;
+    }
+
+    public boolean collectionStarted() {
+        return partitionStarts == collectLocations.size();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 87e1edb..178d2d5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -34,6 +34,7 @@ import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
@@ -85,6 +86,10 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
             switch (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType()) {
                 case INTAKE:
                     handleCompleteConnection();
+                    // Notify CC that Collection started
+                    ctx.sendApplicationMessageToCC(
+                            new FeedPartitionStartMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId()),
+                            null);
                     break;
                 case COMPUTE:
                     handlePartialConnection();
@@ -93,7 +98,6 @@ public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOp
                     throw new IllegalStateException("Invalid source type "
                             + ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType());
             }
-
             State state = collectRuntime.waitTillCollectionOver();
             if (state.equals(State.FINISHED)) {
                 feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 6771010..cd20900 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -36,7 +36,7 @@ import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
+import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
@@ -117,7 +117,7 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
                         adapterRuntimeManager, ctx);
                 feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
                 // Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests.
-                ctx.sendApplicationMessageToCC(new FeedProviderReadyMessage(feedId, ctx.getJobletContext().getJobId()),
+                ctx.sendApplicationMessageToCC(new FeedPartitionStartMessage(feedId, ctx.getJobletContext().getJobId()),
                         null);
                 feedFrameWriter.open();
             } else {


[2/2] incubator-asterixdb git commit: Asterix-1389 Fix Deadlocks in Feed Connections

Posted by am...@apache.org.
Asterix-1389 Fix Deadlocks in Feed Connections

This change ensures the completion of the Feed connect and feed
disconnect statement each as an atomic operation.
Previously, we assumed that with the intake ready on all nodes
and the connect started, the connect is complete. That is not
true. In order for the connect to be complete, we need to ensure
that the connect subscribe to the intake in all intake nodes.
Likewise, the disconnect shouldn't return until the connect
job terminates.

Change-Id: Ib2778b4d7f156c7e06ac9f561a26783c4933a22c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/792
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/fd014710
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/fd014710
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/fd014710

Branch: refs/heads/master
Commit: fd0147101de9bdfa5d35b211b51b7fbfd6aa5618
Parents: db831a4
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Fri Apr 8 19:04:31 2016 +0300
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Fri Apr 8 09:29:35 2016 -0700

----------------------------------------------------------------------
 .../external/FeedJobNotificationHandler.java    | 131 ++++---
 .../app/external/FeedLifecycleListener.java     |  16 +-
 .../asterix/aql/translator/QueryTranslator.java | 339 ++++++++++---------
 .../asterix/messaging/CCMessageBroker.java      |   6 +-
 .../external/feed/api/FeedOperationCounter.java |  24 +-
 .../feed/api/IFeedLifecycleEventSubscriber.java |   3 +-
 .../feed/api/IFeedLifecycleListener.java        |   2 +-
 .../feed/message/FeedPartitionStartMessage.java |  48 +++
 .../feed/message/FeedProviderReadyMessage.java  |  48 ---
 .../external/feed/watch/FeedConnectJobInfo.java |   9 +
 .../FeedCollectOperatorNodePushable.java        |   6 +-
 .../FeedIntakeOperatorNodePushable.java         |   4 +-
 12 files changed, 312 insertions(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
index 04f20fb..a143578 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
@@ -81,7 +81,6 @@ public class FeedJobNotificationHandler implements Runnable {
 
     private final LinkedBlockingQueue<FeedEvent> inbox;
     private final Map<FeedConnectionId, List<IFeedLifecycleEventSubscriber>> eventSubscribers;
-
     private final Map<JobId, FeedJobInfo> jobInfos;
     private final Map<FeedId, FeedIntakeInfo> intakeJobInfos;
     private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
@@ -100,7 +99,8 @@ public class FeedJobNotificationHandler implements Runnable {
 
     @Override
     public void run() {
-        FeedEvent event;
+        FeedEvent event = null;
+        Thread.currentThread().setName("FeedJobNotificationHandler");
         while (true) {
             try {
                 event = inbox.take();
@@ -111,8 +111,8 @@ public class FeedJobNotificationHandler implements Runnable {
                     case JOB_FINISH:
                         handleJobFinishEvent(event);
                         break;
-                    case PROVIDER_READY:
-                        handleProviderReady(event);
+                    case PARTITION_START:
+                        handlePartitionStart(event);
                         break;
                     default:
                         LOGGER.log(Level.WARNING, "Unknown Feed Event");
@@ -121,7 +121,6 @@ public class FeedJobNotificationHandler implements Runnable {
             } catch (Exception e) {
                 e.printStackTrace();
             }
-
         }
     }
 
@@ -162,12 +161,11 @@ public class FeedJobNotificationHandler implements Runnable {
 
         if (feedJointsOnPipeline == null) {
             feedJointsOnPipeline = new Pair<FeedOperationCounter, List<IFeedJoint>>(
-                    new FeedOperationCounter(numOfPrividers, 1), new ArrayList<IFeedJoint>());
+                    new FeedOperationCounter(numOfPrividers), new ArrayList<IFeedJoint>());
             feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
             feedJointsOnPipeline.second.add(feedJoint);
         } else {
             if (!feedJointsOnPipeline.second.contains(feedJoint)) {
-                feedJointsOnPipeline.first.setJobsCount(feedJointsOnPipeline.first.getJobsCount() + 1);
                 feedJointsOnPipeline.second.add(feedJoint);
             } else {
                 throw new IllegalArgumentException("Feed joint " + feedJoint + " already registered");
@@ -175,7 +173,7 @@ public class FeedJobNotificationHandler implements Runnable {
         }
     }
 
-    public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec)
+    public synchronized void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec)
             throws HyracksDataException {
         if (jobInfos.get(jobId) != null) {
             throw new IllegalStateException("Feed job already registered");
@@ -207,7 +205,7 @@ public class FeedJobNotificationHandler implements Runnable {
         }
     }
 
-    public void registerFeedCollectionJob(FeedId sourceFeedId, FeedConnectionId connectionId, JobId jobId,
+    public synchronized void registerFeedCollectionJob(FeedId sourceFeedId, FeedConnectionId connectionId, JobId jobId,
             JobSpecification jobSpec, Map<String, String> feedPolicy) {
         if (jobInfos.get(jobId) != null) {
             throw new IllegalStateException("Feed job already registered");
@@ -242,7 +240,7 @@ public class FeedJobNotificationHandler implements Runnable {
 
     }
 
-    public void deregisterFeedIntakeJob(JobId jobId) {
+    public synchronized void deregisterFeedIntakeJob(JobId jobId) {
         if (jobInfos.get(jobId) == null) {
             throw new IllegalStateException(" Feed Intake job not registered ");
         }
@@ -266,7 +264,7 @@ public class FeedJobNotificationHandler implements Runnable {
 
     }
 
-    private void handleJobStartEvent(FeedEvent message) throws Exception {
+    private synchronized void handleJobStartEvent(FeedEvent message) throws Exception {
         FeedJobInfo jobInfo = jobInfos.get(message.jobId);
         switch (jobInfo.getJobType()) {
             case INTAKE:
@@ -279,7 +277,7 @@ public class FeedJobNotificationHandler implements Runnable {
 
     }
 
-    private void handleJobFinishEvent(FeedEvent message) throws Exception {
+    private synchronized void handleJobFinishEvent(FeedEvent message) throws Exception {
         FeedJobInfo jobInfo = jobInfos.get(message.jobId);
         switch (jobInfo.getJobType()) {
             case INTAKE:
@@ -297,14 +295,27 @@ public class FeedJobNotificationHandler implements Runnable {
         }
     }
 
-    private void handleProviderReady(FeedEvent message) {
-        FeedIntakeInfo jobInfo = (FeedIntakeInfo) jobInfos.get(message.jobId);
-        Pair<FeedOperationCounter, List<IFeedJoint>> feedCounter = feedPipeline.get(message.feedId);
-        feedCounter.first.setProvidersCount(feedCounter.first.getProvidersCount() - 1);;
-        if (feedCounter.first.getProvidersCount() == 0) {
-            jobInfo.getIntakeFeedJoint().setState(State.ACTIVE);
-            jobInfo.setState(FeedJobState.ACTIVE);
-            notifyFeedEventSubscribers(jobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
+    private synchronized void handlePartitionStart(FeedEvent message) {
+        FeedJobInfo jobInfo = jobInfos.get(message.jobId);
+        switch (jobInfo.getJobType()) {
+            case FEED_CONNECT:
+                ((FeedConnectJobInfo) jobInfo).partitionStart();
+                if (((FeedConnectJobInfo) jobInfo).collectionStarted()) {
+                    notifyFeedEventSubscribers(jobInfo, FeedLifecycleEvent.FEED_COLLECT_STARTED);
+                }
+                break;
+            case INTAKE:
+                Pair<FeedOperationCounter, List<IFeedJoint>> feedCounter = feedPipeline.get(message.feedId);
+                feedCounter.first.setPartitionCount(feedCounter.first.getPartitionCount() - 1);;
+                if (feedCounter.first.getPartitionCount() == 0) {
+                    ((FeedIntakeInfo) jobInfo).getIntakeFeedJoint().setState(State.ACTIVE);
+                    jobInfo.setState(FeedJobState.ACTIVE);
+                    notifyFeedEventSubscribers(jobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
+                }
+                break;
+            default:
+                break;
+
         }
     }
 
@@ -335,9 +346,7 @@ public class FeedJobNotificationHandler implements Runnable {
     private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws RemoteException, ACIDException {
         // set locations of feed sub-operations (intake, compute, store)
         setLocations(cInfo);
-
         Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(cInfo.getConnectionId().getFeedId());
-        pair.first.setJobsCount(pair.first.getJobsCount() + 1);
         // activate joints
         List<IFeedJoint> joints = pair.second;
         for (IFeedJoint joint : joints) {
@@ -351,8 +360,6 @@ public class FeedJobNotificationHandler implements Runnable {
         cInfo.setState(FeedJobState.ACTIVE);
         // register activity in metadata
         registerFeedActivity(cInfo);
-        // notify event listeners
-        notifyFeedEventSubscribers(cInfo, FeedLifecycleEvent.FEED_COLLECT_STARTED);
     }
 
     private void notifyFeedEventSubscribers(FeedJobInfo jobInfo, FeedLifecycleEvent event) {
@@ -376,6 +383,9 @@ public class FeedJobNotificationHandler implements Runnable {
                     subscriber.handleFeedEvent(event);
                 }
             }
+            if (event == FeedLifecycleEvent.FEED_COLLECT_ENDED) {
+                eventSubscribers.remove(connId);
+            }
         }
     }
 
@@ -416,12 +426,17 @@ public class FeedJobNotificationHandler implements Runnable {
         return activeConnections;
     }
 
-    public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
+    public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId,
+            IFeedLifecycleEventSubscriber eventSubscriber) {
+        boolean active = false;
         FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
         if (cInfo != null) {
-            return cInfo.getState().equals(FeedJobState.ACTIVE);
+            active = cInfo.getState().equals(FeedJobState.ACTIVE);
         }
-        return false;
+        if (active) {
+            registerFeedEventSubscriber(connectionId, eventSubscriber);
+        }
+        return active;
     }
 
     public void setJobState(FeedConnectionId connectionId, FeedJobState jobState) {
@@ -433,28 +448,26 @@ public class FeedJobNotificationHandler implements Runnable {
         return connectJobInfos.get(connectionId).getState();
     }
 
-    private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, FeedEvent message) throws Exception {
+    private synchronized void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, FeedEvent message)
+            throws Exception {
         IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
         JobInfo info = hcc.getJobInfo(message.jobId);
         JobStatus status = info.getStatus();
         FeedId feedId = intakeInfo.getFeedId();
         Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId);
-        pair.first.setJobsCount(pair.first.getJobsCount() - 1);
         if (status.equals(JobStatus.FAILURE)) {
             pair.first.setFailedIngestion(true);
         }
         // remove feed joints
         deregisterFeedIntakeJob(message.jobId);
-
         // notify event listeners
-        if (pair.first.getJobsCount() == 0) {
-            feedPipeline.remove(feedId);
-            notifyFeedEventSubscribers(intakeInfo, pair.first.isFailedIngestion()
-                    ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : FeedLifecycleEvent.FEED_ENDED);
-        }
+        feedPipeline.remove(feedId);
+        intakeJobInfos.remove(feedId);
+        notifyFeedEventSubscribers(intakeInfo, pair.first.isFailedIngestion() ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
+                : FeedLifecycleEvent.FEED_INTAKE_ENDED);
     }
 
-    private void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
+    private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
         FeedConnectionId connectionId = cInfo.getConnectionId();
 
         IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
@@ -462,8 +475,6 @@ public class FeedJobNotificationHandler implements Runnable {
         JobStatus status = info.getStatus();
         boolean failure = status != null && status.equals(JobStatus.FAILURE);
         FeedPolicyAccessor fpa = new FeedPolicyAccessor(cInfo.getFeedPolicy());
-
-        boolean removeJobHistory = !failure;
         boolean retainSubsription = cInfo.getState().equals(FeedJobState.UNDER_RECOVERY)
                 || (failure && fpa.continueOnHardwareFailure());
 
@@ -474,27 +485,15 @@ public class FeedJobNotificationHandler implements Runnable {
                 LOGGER.info(
                         "Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
             }
-            removeFeedJointsPostPipelineTermination(cInfo.getConnectionId());
         }
 
-        if (removeJobHistory) {
-            connectJobInfos.remove(connectionId);
-            jobInfos.remove(cInfo.getJobId());
-            feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
-        }
+        connectJobInfos.remove(connectionId);
+        jobInfos.remove(cInfo.getJobId());
+        feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
         deregisterFeedActivity(cInfo);
-
-        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline
-                .get(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId());
-        pair.first.setJobsCount(pair.first.getJobsCount() - 1);
-        if (pair.first.getJobsCount() == 0) {
-            notifyFeedEventSubscribers(pair.first.getFeedJobInfo(), pair.first.isFailedIngestion()
-                    ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : FeedLifecycleEvent.FEED_ENDED);
-            feedPipeline.remove(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId());
-        }
-
         // notify event listeners
-        FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_ENDED;
+        FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE
+                : FeedLifecycleEvent.FEED_COLLECT_ENDED;
         notifyFeedEventSubscribers(cInfo, event);
     }
 
@@ -547,23 +546,6 @@ public class FeedJobNotificationHandler implements Runnable {
         }
     }
 
-    public void removeFeedJointsPostPipelineTermination(FeedConnectionId connectionId) {
-        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
-        List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId()).second;
-
-        IFeedJoint sourceJoint = cInfo.getSourceFeedJoint();
-        List<FeedConnectionId> all = sourceJoint.getReceivers();
-        boolean removeSourceJoint = all.size() < 2;
-        if (removeSourceJoint) {
-            feedJoints.remove(sourceJoint);
-        }
-
-        IFeedJoint computeJoint = cInfo.getComputeFeedJoint();
-        if (computeJoint != null && computeJoint.getReceivers().size() < 2) {
-            feedJoints.remove(computeJoint);
-        }
-    }
-
     public boolean isRegisteredFeedJob(JobId jobId) {
         return jobInfos.get(jobId) != null;
     }
@@ -594,7 +576,8 @@ public class FeedJobNotificationHandler implements Runnable {
         return connectJobInfos.get(connectionId).getJobId();
     }
 
-    public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
+    public synchronized void registerFeedEventSubscriber(FeedConnectionId connectionId,
+            IFeedLifecycleEventSubscriber subscriber) {
         List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connectionId);
         if (subscribers == null) {
             subscribers = new ArrayList<IFeedLifecycleEventSubscriber>();
@@ -610,7 +593,7 @@ public class FeedJobNotificationHandler implements Runnable {
         }
     }
 
-    //============================
+    // ============================
 
     public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
         List<IFeedJoint> joints = feedPipeline.containsKey(feedJointKey.getFeedId())

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
index d7129b8..161c863 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
@@ -112,14 +112,14 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
     }
 
     @Override
-    public void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
         if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
             jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.JOB_START));
         }
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId) throws HyracksException {
         if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
             jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.JOB_FINISH));
         } else {
@@ -185,7 +185,7 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
         public enum EventKind {
             JOB_START,
             JOB_FINISH,
-            PROVIDER_READY
+            PARTITION_START
         }
 
         public EventKind eventKind;
@@ -446,12 +446,12 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
     }
 
     @Override
-    public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
-        return feedJobNotificationHandler.isFeedConnectionActive(connectionId);
+    public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId,
+            IFeedLifecycleEventSubscriber eventSubscriber) {
+        return feedJobNotificationHandler.isFeedConnectionActive(connectionId, eventSubscriber);
     }
 
     public void reportPartialDisconnection(FeedConnectionId connectionId) {
-        feedJobNotificationHandler.removeFeedJointsPostPipelineTermination(connectionId);
     }
 
     public void registerFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
@@ -503,8 +503,8 @@ public class FeedLifecycleListener implements IFeedLifecycleListener {
         return feedJobNotificationHandler.getFeedCollectJobId(connectionId);
     }
 
-    public void notifyProviderReady(FeedId feedId, JobId jobId) {
-        jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.PROVIDER_READY, feedId));
+    public synchronized void notifyPartitionStart(FeedId feedId, JobId jobId) {
+        jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.PARTITION_START, feedId));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 3786413..132c8c9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -609,9 +609,9 @@ public class QueryTranslator extends AbstractLangTranslator {
                     }
                     if (compactionPolicy == null) {
                         if (filterField != null) {
-                            //If the dataset has a filter and the user didn't specify a merge
-                            //policy, then we will pick the
-                            //correlated-prefix as the default merge policy.
+                            // If the dataset has a filter and the user didn't specify a merge
+                            // policy, then we will pick the
+                            // correlated-prefix as the default merge policy.
                             compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
                             compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
                         }
@@ -632,12 +632,12 @@ public class QueryTranslator extends AbstractLangTranslator {
 
             }
 
-            //#. initialize DatasetIdFactory if it is not initialized.
+            // #. initialize DatasetIdFactory if it is not initialized.
             if (!DatasetIdFactory.isInitialized()) {
                 DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
             }
 
-            //#. add a new dataset with PendingAddOp
+            // #. add a new dataset with PendingAddOp
             dataset = new Dataset(dataverseName, datasetName, itemTypeDataverseName, itemTypeName,
                     metaItemTypeDataverseName, metaItemTypeName, ngName, compactionPolicy, compactionPolicyProperties,
                     datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(),
@@ -650,21 +650,21 @@ public class QueryTranslator extends AbstractLangTranslator {
                 JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
                         metadataProvider);
 
-                //#. make metadataTxn commit before calling runJob.
+                // #. make metadataTxn commit before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //#. runJob
+                // #. runJob
                 JobUtils.runJob(hcc, jobSpec, true);
 
-                //#. begin new metadataTxn
+                // #. begin new metadataTxn
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             }
 
-            //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
+            // #. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
             MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
             dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
             MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
@@ -676,11 +676,11 @@ public class QueryTranslator extends AbstractLangTranslator {
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
 
-                //#. execute compensation operations
-                //remove the index in NC
-                //[Notice]
-                //As long as we updated(and committed) metadata, we should remove any effect of the job
-                //because an exception occurs during runJob.
+                // #. execute compensation operations
+                // remove the index in NC
+                // [Notice]
+                // As long as we updated(and committed) metadata, we should remove any effect of the job
+                // because an exception occurs during runJob.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -697,7 +697,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                     }
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -820,7 +820,7 @@ public class QueryTranslator extends AbstractLangTranslator {
         String indexName = null;
         JobSpecification spec = null;
         Dataset ds = null;
-        //For external datasets
+        // For external datasets
         ArrayList<ExternalFile> externalFilesSnapshot = null;
         boolean firstExternalDatasetIndex = false;
         boolean filesIndexReplicated = false;
@@ -904,10 +904,10 @@ public class QueryTranslator extends AbstractLangTranslator {
                 }
             }
 
-            //Checks whether a user is trying to create an inverted secondary index on a dataset
-            //with a variable-length primary key.
-            //Currently, we do not support this. Therefore, as a temporary solution, we print an
-            //error message and stop.
+            // Checks whether a user is trying to create an inverted secondary index on a dataset
+            // with a variable-length primary key.
+            // Currently, we do not support this. Therefore, as a temporary solution, we print an
+            // error message and stop.
             if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
                     || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
                     || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
@@ -917,7 +917,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                     IAType keyType = aRecordType.getSubFieldType(partitioningKey);
                     ITypeTraits typeTrait = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
 
-                    //If it is not a fixed length
+                    // If it is not a fixed length
                     if (typeTrait.getFixedLength() < 0) {
                         throw new AlgebricksException("The keyword or ngram index -" + indexName
                                 + " cannot be created on the dataset -" + datasetName
@@ -930,27 +930,27 @@ public class QueryTranslator extends AbstractLangTranslator {
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 validateIfResourceIsActiveInFeed(dataverseName, datasetName);
             } else {
-                //External dataset
-                //Check if the dataset is indexible
+                // External dataset
+                // Check if the dataset is indexible
                 if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) {
                     throw new AlgebricksException(
                             "dataset using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
                                     + " Adapter can't be indexed");
                 }
-                //Check if the name of the index is valid
+                // Check if the name of the index is valid
                 if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
                     throw new AlgebricksException("external dataset index name is invalid");
                 }
 
-                //Check if the files index exist
+                // Check if the files index exist
                 filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                         datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
                 firstExternalDatasetIndex = (filesIndex == null);
-                //Lock external dataset
+                // Lock external dataset
                 ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
                 datasetLocked = true;
                 if (firstExternalDatasetIndex) {
-                    //Verify that no one has created an index before we acquire the lock
+                    // Verify that no one has created an index before we acquire the lock
                     filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
                             dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
                     if (filesIndex != null) {
@@ -960,20 +960,20 @@ public class QueryTranslator extends AbstractLangTranslator {
                     }
                 }
                 if (firstExternalDatasetIndex) {
-                    //Get snapshot from External File System
+                    // Get snapshot from External File System
                     externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
-                    //Add an entry for the files index
+                    // Add an entry for the files index
                     filesIndex = new Index(dataverseName, datasetName,
                             ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE,
                             ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
                             ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
                             IMetadataEntity.PENDING_ADD_OP);
                     MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
-                    //Add files to the external files index
+                    // Add files to the external files index
                     for (ExternalFile file : externalFilesSnapshot) {
                         MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
                     }
-                    //This is the first index for the external dataset, replicate the files index
+                    // This is the first index for the external dataset, replicate the files index
                     spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot,
                             metadataProvider, true);
                     if (spec == null) {
@@ -985,7 +985,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 }
             }
 
-            //check whether there exists another enforced index on the same field
+            // check whether there exists another enforced index on the same field
             if (stmtCreateIndex.isEnforced()) {
                 List<Index> indexes = MetadataManager.INSTANCE
                         .getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
@@ -999,7 +999,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 }
             }
 
-            //#. add a new index with PendingAddOp
+            // #. add a new index with PendingAddOp
             Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
                     keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(),
                     false, IMetadataEntity.PENDING_ADD_OP);
@@ -1011,7 +1011,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                         Lists.newArrayList(index));
             }
 
-            //#. prepare to create the index artifact in NC.
+            // #. prepare to create the index artifact in NC.
             CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
                     index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
                     index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
@@ -1025,14 +1025,14 @@ public class QueryTranslator extends AbstractLangTranslator {
 
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-            //#. create the index artifact in NC.
+            // #. create the index artifact in NC.
             JobUtils.runJob(hcc, spec, true);
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. load data into the index in NC.
+            // #. load data into the index in NC.
             cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
                     index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
                     index.getGramLength(), index.getIndexType());
@@ -1042,24 +1042,24 @@ public class QueryTranslator extends AbstractLangTranslator {
 
             JobUtils.runJob(hcc, spec, true);
 
-            //#. begin new metadataTxn
+            // #. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
+            // #. add another new index with PendingNoOp after deleting the index with PendingAddOp
             MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
                     indexName);
             index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-            //add another new files index with PendingNoOp after deleting the index with
-            //PendingAddOp
+            // add another new files index with PendingNoOp after deleting the index with
+            // PendingAddOp
             if (firstExternalDatasetIndex) {
                 MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
                         filesIndex.getIndexName());
                 filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
                 MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
-                //update transaction timestamp
+                // update transaction timestamp
                 ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date());
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
             }
@@ -1069,7 +1069,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
-            //If files index was replicated for external dataset, it should be cleaned up on NC side
+            // If files index was replicated for external dataset, it should be cleaned up on NC side
             if (filesIndexReplicated) {
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
@@ -1090,8 +1090,8 @@ public class QueryTranslator extends AbstractLangTranslator {
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //remove the index in NC
+                // #. execute compensation operations
+                // remove the index in NC
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1114,7 +1114,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     metadataProvider.setMetadataTxnContext(mdTxnCtx);
                     try {
-                        //Drop External Files from metadata
+                        // Drop External Files from metadata
                         MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     } catch (Exception e2) {
@@ -1126,7 +1126,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     metadataProvider.setMetadataTxnContext(mdTxnCtx);
                     try {
-                        //Drop the files index from metadata
+                        // Drop the files index from metadata
                         MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                                 datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1138,7 +1138,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                                 + ") couldn't be removed from the metadata", e);
                     }
                 }
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1221,7 +1221,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 }
             }
 
-            //# disconnect all feeds from any datasets in the dataverse.
+            // # disconnect all feeds from any datasets in the dataverse.
             List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE
                     .getActiveFeedConnections(null);
             DisconnectFeedStatement disStmt = null;
@@ -1243,13 +1243,13 @@ public class QueryTranslator extends AbstractLangTranslator {
                                     + connection.getDatasetName() + ". Encountered exception " + exception);
                         }
                     }
-                    //prepare job to remove feed log storage
+                    // prepare job to remove feed log storage
                     jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
                             MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedId.getFeedName())));
                 }
             }
 
-            //#. prepare jobs which will drop corresponding datasets with indexes.
+            // #. prepare jobs which will drop corresponding datasets with indexes.
             List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
             for (int j = 0; j < datasets.size(); j++) {
                 String datasetName = datasets.get(j).getDatasetName();
@@ -1269,7 +1269,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                     CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
                     jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
                 } else {
-                    //External dataset
+                    // External dataset
                     List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
                             datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
@@ -1289,10 +1289,10 @@ public class QueryTranslator extends AbstractLangTranslator {
                 }
             }
             jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
-            //#. mark PendingDropOp on the dataverse record by
-            //first, deleting the dataverse record from the DATAVERSE_DATASET
-            //second, inserting the dataverse record with the PendingDropOp value into the
-            //DATAVERSE_DATASET
+            // #. mark PendingDropOp on the dataverse record by
+            // first, deleting the dataverse record from the DATAVERSE_DATASET
+            // second, inserting the dataverse record with the PendingDropOp value into the
+            // DATAVERSE_DATASET
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
             MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
                     new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP));
@@ -1309,7 +1309,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. finally, delete the dataverse.
+            // #. finally, delete the dataverse.
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
             if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
                 activeDefaultDataverse = null;
@@ -1325,18 +1325,18 @@ public class QueryTranslator extends AbstractLangTranslator {
                     activeDefaultDataverse = null;
                 }
 
-                //#. execute compensation operations
-                //remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
+                    // do no throw exception since still the metadata needs to be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 try {
                     MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
@@ -1383,7 +1383,7 @@ public class QueryTranslator extends AbstractLangTranslator {
 
             Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>();
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                //prepare job spec(s) that would disconnect any active feeds involving the dataset.
+                // prepare job spec(s) that would disconnect any active feeds involving the dataset.
                 List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
                 if (feedConnections != null && !feedConnections.isEmpty()) {
                     for (FeedConnectionId connection : feedConnections) {
@@ -1394,14 +1394,14 @@ public class QueryTranslator extends AbstractLangTranslator {
                             LOGGER.info("Disconnecting feed " + connection.getFeedId().getFeedName() + " from dataset "
                                     + datasetName + " as dataset is being dropped");
                         }
-                        //prepare job to remove feed log storage
+                        // prepare job to remove feed log storage
                         jobsToExecute
                                 .add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(mdTxnCtx,
                                         connection.getFeedId().getDataverse(), connection.getFeedId().getFeedName())));
                     }
                 }
 
-                //#. prepare jobs to drop the datatset and the indexes in NC
+                // #. prepare jobs to drop the datatset and the indexes in NC
                 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                 for (int j = 0; j < indexes.size(); j++) {
                     if (indexes.get(j).isSecondaryIndex()) {
@@ -1413,7 +1413,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
                 jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
 
-                //#. mark the existing dataset as PendingDropOp
+                // #. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
                 MetadataManager.INSTANCE.addDataset(mdTxnCtx,
                         new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
@@ -1426,12 +1426,12 @@ public class QueryTranslator extends AbstractLangTranslator {
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //# disconnect the feeds
+                // # disconnect the feeds
                 for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
                     JobUtils.runJob(hcc, p.first, true);
                 }
 
-                //#. run the jobs
+                // #. run the jobs
                 for (JobSpecification jobSpec : jobsToExecute) {
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
@@ -1440,9 +1440,9 @@ public class QueryTranslator extends AbstractLangTranslator {
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             } else {
-                //External dataset
+                // External dataset
                 ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-                //#. prepare jobs to drop the datatset and the indexes in NC
+                // #. prepare jobs to drop the datatset and the indexes in NC
                 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                 for (int j = 0; j < indexes.size(); j++) {
                     if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
@@ -1457,7 +1457,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                     }
                 }
 
-                //#. mark the existing dataset as PendingDropOp
+                // #. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
                 MetadataManager.INSTANCE.addDataset(mdTxnCtx,
                         new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
@@ -1469,7 +1469,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //#. run the jobs
+                // #. run the jobs
                 for (JobSpecification jobSpec : jobsToExecute) {
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
@@ -1481,9 +1481,9 @@ public class QueryTranslator extends AbstractLangTranslator {
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             }
 
-            //#. finally, delete the dataset.
+            // #. finally, delete the dataset.
             MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-            //Drop the associated nodegroup
+            // Drop the associated nodegroup
             String nodegroup = ds.getNodeGroupName();
             if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
                 MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, dataverseName + ":" + datasetName);
@@ -1496,18 +1496,18 @@ public class QueryTranslator extends AbstractLangTranslator {
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
+                    // do no throw exception since still the metadata needs to be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1542,7 +1542,7 @@ public class QueryTranslator extends AbstractLangTranslator {
         MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
 
         String indexName = null;
-        //For external index
+        // For external index
         boolean dropFilesIndex = false;
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
@@ -1581,18 +1581,18 @@ public class QueryTranslator extends AbstractLangTranslator {
                         throw new AlgebricksException("There is no index with this name " + indexName + ".");
                     }
                 }
-                //#. prepare a job to drop the index in NC.
+                // #. prepare a job to drop the index in NC.
                 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
 
-                //#. mark PendingDropOp on the existing index
+                // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
                         new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
                                 index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
-                //#. commit the existing transaction before calling runJob.
+                // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
@@ -1601,15 +1601,15 @@ public class QueryTranslator extends AbstractLangTranslator {
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
 
-                //#. begin a new transaction
+                // #. begin a new transaction
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-                //#. finally, delete the existing index
+                // #. finally, delete the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
             } else {
-                //External dataset
+                // External dataset
                 indexName = stmtIndexDrop.getIndexName().getValue();
                 Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 if (index == null) {
@@ -1622,21 +1622,21 @@ public class QueryTranslator extends AbstractLangTranslator {
                 } else if (ExternalIndexingOperations.isFileIndex(index)) {
                     throw new AlgebricksException("Dropping a dataset's files index is not allowed.");
                 }
-                //#. prepare a job to drop the index in NC.
+                // #. prepare a job to drop the index in NC.
                 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
                 List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
                         datasetName);
                 if (datasetIndexes.size() == 2) {
                     dropFilesIndex = true;
-                    //only one index + the files index, we need to delete both of the indexes
+                    // only one index + the files index, we need to delete both of the indexes
                     for (Index externalIndex : datasetIndexes) {
                         if (ExternalIndexingOperations.isFileIndex(externalIndex)) {
                             cds = new CompiledIndexDropStatement(dataverseName, datasetName,
                                     externalIndex.getIndexName());
                             jobsToExecute.add(
                                     ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
-                            //#. mark PendingDropOp on the existing files index
+                            // #. mark PendingDropOp on the existing files index
                             MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
                                     externalIndex.getIndexName());
                             MetadataManager.INSTANCE.addIndex(mdTxnCtx,
@@ -1649,14 +1649,14 @@ public class QueryTranslator extends AbstractLangTranslator {
                     }
                 }
 
-                //#. mark PendingDropOp on the existing index
+                // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
                         new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
                                 index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
-                //#. commit the existing transaction before calling runJob.
+                // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
@@ -1665,15 +1665,15 @@ public class QueryTranslator extends AbstractLangTranslator {
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
 
-                //#. begin a new transaction
+                // #. begin a new transaction
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-                //#. finally, delete the existing index
+                // #. finally, delete the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 if (dropFilesIndex) {
-                    //delete the files index too
+                    // delete the files index too
                     MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
                             ExternalIndexingOperations.getFilesIndexName(datasetName));
                     MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
@@ -1688,18 +1688,18 @@ public class QueryTranslator extends AbstractLangTranslator {
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
+                    // do no throw exception since still the metadata needs to be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1953,11 +1953,11 @@ public class QueryTranslator extends AbstractLangTranslator {
             ICompiledDmlStatement stmt)
             throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
-        //Query Rewriting (happens under the same ongoing metadata transaction)
+        // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
                 sessionConfig);
 
-        //Query Compilation (happens under the same ongoing metadata transaction)
+        // Query Compilation (happens under the same ongoing metadata transaction)
         JobSpecification spec = apiFramework.compileQuery(declaredFunctions, metadataProvider, reWrittenQuery.first,
                 reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, stmt);
 
@@ -2178,7 +2178,8 @@ public class QueryTranslator extends AbstractLangTranslator {
 
             feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName().getValue());
 
-            if (FeedLifecycleListener.INSTANCE.isFeedConnectionActive(feedConnId)) {
+            subscriberRegistered = FeedLifecycleListener.INSTANCE.isFeedConnectionActive(feedConnId, eventSubscriber);
+            if (subscriberRegistered) {
                 throw new AsterixException("Feed " + cfs.getFeedName() + " is already connected to dataset "
                         + cfs.getDatasetName().getValue());
             }
@@ -2186,14 +2187,13 @@ public class QueryTranslator extends AbstractLangTranslator {
             FeedPolicyEntity feedPolicy = FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(),
                     mdTxnCtx);
 
-            //All Metadata checks have passed. Feed connect request is valid. //
+            // All Metadata checks have passed. Feed connect request is valid. //
 
             FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties());
             Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = getFeedConnectionRequest(dataverseName,
                     feed, cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
             FeedConnectionRequest connectionRequest = triple.first;
             boolean createFeedIntakeJob = triple.second;
-
             FeedLifecycleListener.INSTANCE.registerFeedEventSubscriber(feedConnId, eventSubscriber);
             subscriberRegistered = true;
             if (createFeedIntakeJob) {
@@ -2202,8 +2202,8 @@ public class QueryTranslator extends AbstractLangTranslator {
                         feedId.getFeedName());
                 Pair<JobSpecification, IAdapterFactory> pair = FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
                         metadataProvider, policyAccessor);
-                //adapter configuration are valid at this stage
-                //register the feed joints (these are auto-de-registered)
+                // adapter configuration are valid at this stage
+                // register the feed joints (these are auto-de-registered)
                 int numOfPrividers = pair.second.getPartitionConstraint().getLocations().length;
                 for (IFeedJoint fj : triple.third) {
                     FeedLifecycleListener.INSTANCE.registerFeedJoint(fj, numOfPrividers);
@@ -2227,7 +2227,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             bActiveTxn = false;
             eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
             if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) {
-                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); // blocking call
+                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED); // blocking call
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2270,7 +2270,7 @@ public class QueryTranslator extends AbstractLangTranslator {
         boolean isFeedJointAvailable = FeedLifecycleListener.INSTANCE.isFeedJointAvailable(feedJointKey);
         if (!isFeedJointAvailable) {
             sourceFeedJoint = FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
-            if (sourceFeedJoint == null) { //the feed is currently not being ingested, i.e., it is unavailable.
+            if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable.
                 connectionLocation = FeedRuntimeType.INTAKE;
                 FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId
                 Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getFeedName());
@@ -2290,8 +2290,8 @@ public class QueryTranslator extends AbstractLangTranslator {
                     functionsToApply.add(f);
                 }
             }
-            //register the compute feed point that represents the final output from the collection of
-            //functions that will be applied.
+            // register the compute feed point that represents the final output from the collection of
+            // functions that will be applied.
             if (!functionsToApply.isEmpty()) {
                 FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply);
                 IFeedJoint computeFeedJoint = new FeedJoint(computeFeedJointKey, feed.getFeedId(),
@@ -2341,7 +2341,6 @@ public class QueryTranslator extends AbstractLangTranslator {
         DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
         String dataverseName = getActiveDataverse(cfs.getDataverseName());
         String datasetName = cfs.getDatasetName().getValue();
-
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2350,7 +2349,9 @@ public class QueryTranslator extends AbstractLangTranslator {
         Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
 
         FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue());
-        boolean isFeedConnectionActive = FeedLifecycleListener.INSTANCE.isFeedConnectionActive(connectionId);
+        IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
+        boolean isFeedConnectionActive = FeedLifecycleListener.INSTANCE.isFeedConnectionActive(connectionId,
+                eventSubscriber);
         if (!isFeedConnectionActive) {
             throw new AsterixException("Feed " + feed.getFeedId().getFeedName() + " is currently not connected to "
                     + cfs.getDatasetName().getValue() + ". Invalid operation!");
@@ -2377,7 +2378,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(connectionId);
                 FeedLifecycleListener.INSTANCE.reportPartialDisconnection(connectionId);
             }
-
+            eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED);
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
@@ -2463,7 +2464,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
                     ds.getItemTypeDataverseName(), itemTypeName);
 
-            //Prepare jobs to compact the datatset and its indexes
+            // Prepare jobs to compact the datatset and its indexes
             List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
             if (indexes.size() == 0) {
                 throw new AlgebricksException(
@@ -2500,7 +2501,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            //#. run the jobs
+            // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
                 JobUtils.runJob(hcc, jobSpec, true);
             }
@@ -2548,9 +2549,9 @@ public class QueryTranslator extends AbstractLangTranslator {
                         ResultReader resultReader = new ResultReader(hcc, hdc);
                         resultReader.open(jobId, metadataProvider.getResultSetId());
 
-                        //In this case (the normal case), we don't use the
-                        //"response" JSONObject - just stream the results
-                        //to the "out" PrintWriter
+                        // In this case (the normal case), we don't use the
+                        // "response" JSONObject - just stream the results
+                        // to the "out" PrintWriter
                         if (sessionConfig.fmt() == OutputFormat.CSV
                                 && sessionConfig.is(SessionConfig.FORMAT_CSV_HEADER)) {
                             ResultUtils.displayCSVHeader(metadataProvider.findOutputRecordType(), sessionConfig);
@@ -2579,7 +2580,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             throw e;
         } finally {
             MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
-            //release external datasets' locks acquired during compilation of the query
+            // release external datasets' locks acquired during compilation of the query
             ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
@@ -2640,56 +2641,56 @@ public class QueryTranslator extends AbstractLangTranslator {
             ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName);
 
-            //Dataset exists ?
+            // Dataset exists ?
             if (ds == null) {
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
             }
-            //Dataset external ?
+            // Dataset external ?
             if (ds.getDatasetType() != DatasetType.EXTERNAL) {
                 throw new AlgebricksException(
                         "dataset " + datasetName + " in dataverse " + dataverseName + " is not an external dataset");
             }
-            //Dataset has indexes ?
+            // Dataset has indexes ?
             indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
             if (indexes.size() == 0) {
                 throw new AlgebricksException("External dataset " + datasetName + " in dataverse " + dataverseName
                         + " doesn't have any index");
             }
 
-            //Record transaction time
+            // Record transaction time
             Date txnTime = new Date();
 
-            //refresh lock here
+            // refresh lock here
             ExternalDatasetsRegistry.INSTANCE.refreshBegin(ds);
             lockAquired = true;
 
-            //Get internal files
+            // Get internal files
             metadataFiles = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, ds);
             deletedFiles = new ArrayList<ExternalFile>();
             addedFiles = new ArrayList<ExternalFile>();
             appendedFiles = new ArrayList<ExternalFile>();
 
-            //Compute delta
-            //Now we compare snapshot with external file system
+            // Compute delta
+            // Now we compare snapshot with external file system
             if (ExternalIndexingOperations.isDatasetUptodate(ds, metadataFiles, addedFiles, deletedFiles,
                     appendedFiles)) {
                 ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(txnTime);
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                //latch will be released in the finally clause
+                // latch will be released in the finally clause
                 return;
             }
 
-            //At this point, we know data has changed in the external file system, record
-            //transaction in metadata and start
+            // At this point, we know data has changed in the external file system, record
+            // transaction in metadata and start
             transactionDataset = ExternalIndexingOperations.createTransactionDataset(ds);
             /*
              * Remove old dataset record and replace it with a new one
              */
             MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset);
 
-            //Add delta files to the metadata
+            // Add delta files to the metadata
             for (ExternalFile file : addedFiles) {
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
@@ -2700,7 +2701,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
 
-            //Create the files index update job
+            // Create the files index update job
             spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, metadataFiles, deletedFiles, addedFiles,
                     appendedFiles, metadataProvider);
 
@@ -2708,22 +2709,22 @@ public class QueryTranslator extends AbstractLangTranslator {
             bActiveTxn = false;
             transactionState = ExternalDatasetTransactionState.BEGIN;
 
-            //run the files update job
+            // run the files update job
             JobUtils.runJob(hcc, spec, true);
 
             for (Index index : indexes) {
                 if (!ExternalIndexingOperations.isFileIndex(index)) {
                     spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, deletedFiles,
                             addedFiles, appendedFiles, metadataProvider);
-                    //run the files update job
+                    // run the files update job
                     JobUtils.runJob(hcc, spec, true);
                 }
             }
 
-            //all index updates has completed successfully, record transaction state
+            // all index updates has completed successfully, record transaction state
             spec = ExternalIndexingOperations.buildCommitJob(ds, indexes, metadataProvider);
 
-            //Aquire write latch again -> start a transaction and record the decision to commit
+            // Aquire write latch again -> start a transaction and record the decision to commit
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             bActiveTxn = true;
@@ -2734,9 +2735,9 @@ public class QueryTranslator extends AbstractLangTranslator {
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             transactionState = ExternalDatasetTransactionState.READY_TO_COMMIT;
-            //We don't release the latch since this job is expected to be quick
+            // We don't release the latch since this job is expected to be quick
             JobUtils.runJob(hcc, spec, true);
-            //Start a new metadata transaction to record the final state of the transaction
+            // Start a new metadata transaction to record the final state of the transaction
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             bActiveTxn = true;
@@ -2749,11 +2750,11 @@ public class QueryTranslator extends AbstractLangTranslator {
                     while (iterator.hasNext()) {
                         ExternalFile appendedFile = iterator.next();
                         if (file.getFileName().equals(appendedFile.getFileName())) {
-                            //delete existing file
+                            // delete existing file
                             MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                            //delete existing appended file
+                            // delete existing appended file
                             MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, appendedFile);
-                            //add the original file with appended information
+                            // add the original file with appended information
                             appendedFile.setFileNumber(file.getFileNumber());
                             appendedFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
                             MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, appendedFile);
@@ -2763,24 +2764,24 @@ public class QueryTranslator extends AbstractLangTranslator {
                 }
             }
 
-            //remove the deleted files delta
+            // remove the deleted files delta
             for (ExternalFile file : deletedFiles) {
                 MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
             }
 
-            //insert new files
+            // insert new files
             for (ExternalFile file : addedFiles) {
                 MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
                 file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
 
-            //mark the transaction as complete
+            // mark the transaction as complete
             ((ExternalDatasetDetails) transactionDataset.getDatasetDetails())
                     .setState(ExternalDatasetTransactionState.COMMIT);
             MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset);
 
-            //commit metadata transaction
+            // commit metadata transaction
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             success = true;
         } catch (Exception e) {
@@ -2792,12 +2793,12 @@ public class QueryTranslator extends AbstractLangTranslator {
                         + datasetName + ") refresh couldn't carry out the commit phase", e);
             }
             if (transactionState == ExternalDatasetTransactionState.COMMIT) {
-                //Nothing to do , everything should be clean
+                // Nothing to do , everything should be clean
                 throw e;
             }
             if (transactionState == ExternalDatasetTransactionState.BEGIN) {
-                //transaction failed, need to do the following
-                //clean NCs removing transaction components
+                // transaction failed, need to do the following
+                // clean NCs removing transaction components
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2807,12 +2808,12 @@ public class QueryTranslator extends AbstractLangTranslator {
                 try {
                     JobUtils.runJob(hcc, spec, true);
                 } catch (Exception e2) {
-                    //This should never happen -- fix throw illegal
+                    // This should never happen -- fix throw illegal
                     e.addSuppressed(e2);
                     throw new IllegalStateException("System is in inconsistent state. Failed to abort refresh", e);
                 }
-                //remove the delta of files
-                //return the state of the dataset to committed
+                // remove the delta of files
+                // return the state of the dataset to committed
                 try {
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     for (ExternalFile file : deletedFiles) {
@@ -2825,7 +2826,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                         MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
                     }
                     MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
-                    //commit metadata transaction
+                    // commit metadata transaction
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     abort(e, e2, mdTxnCtx);
@@ -2878,20 +2879,20 @@ public class QueryTranslator extends AbstractLangTranslator {
                     datasetNameFrom, datasetNameTo, mdTxnCtx);
 
             String pregelixHomeKey = "PREGELIX_HOME";
-            //Finds PREGELIX_HOME in system environment variables.
+            // Finds PREGELIX_HOME in system environment variables.
             String pregelixHome = System.getenv(pregelixHomeKey);
-            //Finds PREGELIX_HOME in Java properties.
+            // Finds PREGELIX_HOME in Java properties.
             if (pregelixHome == null) {
                 pregelixHome = System.getProperty(pregelixHomeKey);
             }
-            //Finds PREGELIX_HOME in AsterixDB configuration.
+            // Finds PREGELIX_HOME in AsterixDB configuration.
             if (pregelixHome == null) {
-                //Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties,
-                //pregelixHome can never be null.
+                // Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties,
+                // pregelixHome can never be null.
                 pregelixHome = AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome();
             }
 
-            //Constructs the pregelix command line.
+            // Constructs the pregelix command line.
             List<String> cmd = constructPregelixCommand(pregelixStmt, dataverseNameFrom, datasetNameFrom,
                     dataverseNameTo, datasetNameTo);
             ProcessBuilder pb = new ProcessBuilder(cmd);
@@ -2900,9 +2901,9 @@ public class QueryTranslator extends AbstractLangTranslator {
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            //Executes the Pregelix command.
+            // Executes the Pregelix command.
             int resultState = executeExternalShellProgram(pb);
-            //Checks the return state of the external Pregelix command.
+            // Checks the return state of the external Pregelix command.
             if (resultState != 0) {
                 throw new AlgebricksException(
                         "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restarted. "
@@ -2920,12 +2921,12 @@ public class QueryTranslator extends AbstractLangTranslator {
         }
     }
 
-    //Prepares to run a program on external runtime.
+    // Prepares to run a program on external runtime.
     private void prepareRunExternalRuntime(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
             RunStatement pregelixStmt, String dataverseNameFrom, String dataverseNameTo, String datasetNameFrom,
             String datasetNameTo, MetadataTransactionContext mdTxnCtx)
             throws AlgebricksException, AsterixException, Exception {
-        //Validates the source/sink dataverses and datasets.
+        // Validates the source/sink dataverses and datasets.
         Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, datasetNameFrom);
         if (fromDataset == null) {
             throw new AsterixException("The source dataset " + datasetNameFrom + " in dataverse " + dataverseNameFrom
@@ -2938,7 +2939,7 @@ public class QueryTranslator extends AbstractLangTranslator {
         }
 
         try {
-            //Find the primary index of the sink dataset.
+            // Find the primary index of the sink dataset.
             Index toIndex = null;
             List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
                     pregelixStmt.getDatasetNameTo().getValue());
@@ -2951,7 +2952,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             if (toIndex == null) {
                 throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
             }
-            //Cleans up the sink dataset -- Drop and then Create.
+            // Cleans up the sink dataset -- Drop and then Create.
             DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo), pregelixStmt.getDatasetNameTo(),
                     true);
             this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
@@ -2970,12 +2971,12 @@ public class QueryTranslator extends AbstractLangTranslator {
             throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
         }
 
-        //Flushes source dataset.
+        // Flushes source dataset.
         FlushDatasetUtils.flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom,
                 datasetNameFrom);
     }
 
-    //Executes external shell commands.
+    // Executes external shell commands.
     private int executeExternalShellProgram(ProcessBuilder pb)
             throws IOException, AlgebricksException, InterruptedException {
         Process process = pb.start();
@@ -3001,15 +3002,15 @@ public class QueryTranslator extends AbstractLangTranslator {
             }
             process.waitFor();
         }
-        //Gets the exit value of the program.
+        // Gets the exit value of the program.
         int resultState = process.exitValue();
         return resultState;
     }
 
-    //Constructs a Pregelix command line.
+    // Constructs a Pregelix command line.
     private List<String> constructPregelixCommand(RunStatement pregelixStmt, String fromDataverseName,
             String fromDatasetName, String toDataverseName, String toDatasetName) {
-        //Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
+        // Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
         AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
         AsterixClusterProperties clusterProperties = AsterixClusterProperties.INSTANCE;
         String clientIP = clusterProperties.getCluster().getMasterNode().getClientIp();
@@ -3024,7 +3025,7 @@ public class QueryTranslator extends AbstractLangTranslator {
         asterixdbParameterBuilder.append("pregelix.asterixdb.output.dataset=" + toDatasetName + ",");
         asterixdbParameterBuilder.append("pregelix.asterixdb.output.cleanup=false,");
 
-        //construct command
+        // construct command
         List<String> cmds = new ArrayList<String>();
         cmds.add("bin/pregelix");
         cmds.add(pregelixStmt.getParameters().get(0)); // jar
@@ -3037,7 +3038,7 @@ public class QueryTranslator extends AbstractLangTranslator {
         String outputConverterClassValue = "=org.apache.pregelix.example.converter.VLongIdOutputVertexConverter,";
         boolean custPropAdded = false;
         boolean meetCustProp = false;
-        //User parameters.
+        // User parameters.
         for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
             if (meetCustProp) {
                 if (!s.contains(inputConverterClassKey)) {
@@ -3059,10 +3060,10 @@ public class QueryTranslator extends AbstractLangTranslator {
 
         if (!custPropAdded) {
             cmds.add(customizedPregelixProperty);
-            //Appends default converter classes to asterixdbParameterBuilder.
+            // Appends default converter classes to asterixdbParameterBuilder.
             asterixdbParameterBuilder.append(inputConverterClassKey + inputConverterClassValue);
             asterixdbParameterBuilder.append(outputConverterClassKey + outputConverterClassValue);
-            //Remove the last comma.
+            // Remove the last comma.
             asterixdbParameterBuilder.delete(asterixdbParameterBuilder.length() - 1,
                     asterixdbParameterBuilder.length());
             cmds.add(asterixdbParameterBuilder.toString());

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fd014710/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 2378032..9dd4025 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -37,7 +37,7 @@ import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
 import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
+import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -91,8 +91,8 @@ public class CCMessageBroker implements ICCMessageBroker {
     }
 
     private void handleFeedProviderReady(IMessage message) {
-        FeedProviderReadyMessage msg = (FeedProviderReadyMessage) message;
-        FeedLifecycleListener.INSTANCE.notifyProviderReady(msg.getFeedId(), msg.getJobId());
+        FeedPartitionStartMessage msg = (FeedPartitionStartMessage) message;
+        FeedLifecycleListener.INSTANCE.notifyPartitionStart(msg.getFeedId(), msg.getJobId());
     }
 
     private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception {