You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:18 UTC

[24/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java
new file mode 100644
index 0000000..b65d8b1
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feed/FeedTrackingManager.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.feed;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.feed.api.IFeedTrackingManager;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
+import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
+import org.apache.asterix.file.FeedOperations;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedTrackingManager implements IFeedTrackingManager {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedTrackingManager.class.getName());
+
+    private final BitSet allOnes;
+
+    private Map<FeedConnectionId, Map<AckId, BitSet>> ackHistory;
+    private Map<FeedConnectionId, Map<AckId, Integer>> maxBaseAcked;
+
+    public FeedTrackingManager() {
+        byte[] allOneBytes = new byte[128];
+        Arrays.fill(allOneBytes, (byte) 0xff);
+        allOnes = BitSet.valueOf(allOneBytes);
+        ackHistory = new HashMap<FeedConnectionId, Map<AckId, BitSet>>();
+        maxBaseAcked = new HashMap<FeedConnectionId, Map<AckId, Integer>>();
+    }
+
+    @Override
+    public synchronized void submitAckReport(FeedTupleCommitAckMessage ackMessage) {
+        AckId ackId = getAckId(ackMessage);
+        Map<AckId, BitSet> acksForConnection = ackHistory.get(ackMessage.getConnectionId());
+        if (acksForConnection == null) {
+            acksForConnection = new HashMap<AckId, BitSet>();
+            acksForConnection.put(ackId, BitSet.valueOf(ackMessage.getCommitAcks()));
+            ackHistory.put(ackMessage.getConnectionId(), acksForConnection);
+        }
+        BitSet currentAcks = acksForConnection.get(ackId);
+        if (currentAcks == null) {
+            currentAcks = BitSet.valueOf(ackMessage.getCommitAcks());
+            acksForConnection.put(ackId, currentAcks);
+        } else {
+            currentAcks.or(BitSet.valueOf(ackMessage.getCommitAcks()));
+        }
+        if (Arrays.equals(currentAcks.toByteArray(), allOnes.toByteArray())) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(ackMessage.getIntakePartition() + " (" + ackMessage.getBase() + ")" + " is convered");
+            }
+            Map<AckId, Integer> maxBaseAckedForConnection = maxBaseAcked.get(ackMessage.getConnectionId());
+            if (maxBaseAckedForConnection == null) {
+                maxBaseAckedForConnection = new HashMap<AckId, Integer>();
+                maxBaseAcked.put(ackMessage.getConnectionId(), maxBaseAckedForConnection);
+            }
+            Integer maxBaseAckedValue = maxBaseAckedForConnection.get(ackId);
+            if (maxBaseAckedValue == null) {
+                maxBaseAckedValue = ackMessage.getBase();
+                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
+                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
+                        ackMessage.getBase());
+            } else if (ackMessage.getBase() == maxBaseAckedValue + 1) {
+                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
+                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
+                        ackMessage.getBase());
+            } else {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Ignoring discountiuous acked base " + ackMessage.getBase() + " for " + ackId);
+                }
+            }
+
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("AckId " + ackId + " pending number of acks " + (128 * 8 - currentAcks.cardinality()));
+            }
+        }
+    }
+
+    public synchronized void disableTracking(FeedConnectionId connectionId) {
+        ackHistory.remove(connectionId);
+        maxBaseAcked.remove(connectionId);
+    }
+
+    private void sendCommitResponseMessage(FeedConnectionId connectionId, int partition, int base) {
+        FeedTupleCommitResponseMessage response = new FeedTupleCommitResponseMessage(connectionId, partition, base);
+        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+        String collectLocation = collectLocations.get(partition);
+        Set<String> messageDestinations = new HashSet<String>();
+        messageDestinations.add(collectLocation);
+        messageDestinations.addAll(storageLocations);
+        try {
+            JobSpecification spec = FeedOperations.buildCommitAckResponseJob(response, messageDestinations);
+            CentralFeedManager.runJob(spec, false);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to send commit response message " + response + " exception " + e.getMessage());
+            }
+        }
+    }
+
+    private static AckId getAckId(FeedTupleCommitAckMessage ackMessage) {
+        return new AckId(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), ackMessage.getBase());
+    }
+
+    private static class AckId {
+        private FeedConnectionId connectionId;
+        private int intakePartition;
+        private int base;
+
+        public AckId(FeedConnectionId connectionId, int intakePartition, int base) {
+            this.connectionId = connectionId;
+            this.intakePartition = intakePartition;
+            this.base = base;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof AckId)) {
+                return false;
+            }
+            AckId other = (AckId) o;
+            return other.getConnectionId().equals(connectionId) && other.getIntakePartition() == intakePartition
+                    && other.getBase() == base;
+        }
+
+        @Override
+        public String toString() {
+            return connectionId + "[" + intakePartition + "]" + "(" + base + ")";
+        }
+
+        @Override
+        public int hashCode() {
+            return toString().hashCode();
+        }
+
+        public FeedConnectionId getConnectionId() {
+            return connectionId;
+        }
+
+        public int getIntakePartition() {
+            return intakePartition;
+        }
+
+        public int getBase() {
+            return base;
+        }
+
+    }
+
+    @Override
+    public void disableAcking(FeedConnectionId connectionId) {
+        ackHistory.remove(connectionId);
+        maxBaseAcked.remove(connectionId);
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("Acking disabled for " + connectionId);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java
new file mode 100644
index 0000000..9d746c8
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkCollection.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.feed;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.api.common.SessionConfig;
+import org.apache.asterix.api.common.SessionConfig.OutputFormat;
+import org.apache.asterix.aql.translator.QueryTranslator;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.feed.api.IFeedWork;
+import org.apache.asterix.external.feed.api.IFeedWorkEventListener;
+import org.apache.asterix.external.feed.management.FeedCollectInfo;
+import org.apache.asterix.external.feed.management.FeedConnectionRequest;
+import org.apache.asterix.external.feed.management.FeedConnectionRequest.ConnectionStatus;
+import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.DataverseDecl;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.hyracks.api.job.JobId;
+
+/**
+ * A collection of feed management related task, each represented as an implementation of {@code IFeedWork}.
+ */
+public class FeedWorkCollection {
+
+    private static Logger LOGGER = Logger.getLogger(FeedWorkCollection.class.getName());
+    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
+
+    /**
+     * The task of subscribing to a feed to obtain data.
+     */
+    public static class SubscribeFeedWork implements IFeedWork {
+
+        private final Runnable runnable;
+
+        private final FeedConnectionRequest request;
+
+        @Override
+        public Runnable getRunnable() {
+            return runnable;
+        }
+
+        public SubscribeFeedWork(String[] locations, FeedConnectionRequest request) {
+            this.runnable = new SubscribeFeedWorkRunnable(locations, request);
+            this.request = request;
+        }
+
+        private static class SubscribeFeedWorkRunnable implements Runnable {
+
+            private final FeedConnectionRequest request;
+            private final String[] locations;
+
+            public SubscribeFeedWorkRunnable(String[] locations, FeedConnectionRequest request) {
+                this.request = request;
+                this.locations = locations;
+            }
+
+            @Override
+            public void run() {
+                try {
+                    PrintWriter writer = new PrintWriter(System.out, true);
+                    SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
+                    DataverseDecl dataverseDecl = new DataverseDecl(
+                            new Identifier(request.getReceivingFeedId().getDataverse()));
+                    SubscribeFeedStatement subscribeStmt = new SubscribeFeedStatement(locations, request);
+                    List<Statement> statements = new ArrayList<Statement>();
+                    statements.add(dataverseDecl);
+                    statements.add(subscribeStmt);
+                    QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
+                    translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
+                            QueryTranslator.ResultDelivery.SYNC);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Submitted connection requests for execution: " + request);
+                    }
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.severe("Exception in executing " + request);
+                    }
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        public static class FeedSubscribeWorkEventListener implements IFeedWorkEventListener {
+
+            @Override
+            public void workFailed(IFeedWork work, Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning(" Feed subscription request " + ((SubscribeFeedWork) work).request
+                            + " failed with exception " + e);
+                }
+            }
+
+            @Override
+            public void workCompleted(IFeedWork work) {
+                ((SubscribeFeedWork) work).request.setSubscriptionStatus(ConnectionStatus.ACTIVE);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.warning(" Feed subscription request " + ((SubscribeFeedWork) work).request + " completed ");
+                }
+            }
+
+        }
+
+        public FeedConnectionRequest getRequest() {
+            return request;
+        }
+
+        @Override
+        public String toString() {
+            return "SubscribeFeedWork for [" + request + "]";
+        }
+
+    }
+
+    /**
+     * The task of activating a set of feeds.
+     */
+    public static class ActivateFeedWork implements IFeedWork {
+
+        private final Runnable runnable;
+
+        @Override
+        public Runnable getRunnable() {
+            return runnable;
+        }
+
+        public ActivateFeedWork(List<FeedCollectInfo> feedsToRevive) {
+            this.runnable = new FeedsActivateRunnable(feedsToRevive);
+        }
+
+        public ActivateFeedWork() {
+            this.runnable = new FeedsActivateRunnable();
+        }
+
+        private static class FeedsActivateRunnable implements Runnable {
+
+            private List<FeedCollectInfo> feedsToRevive;
+            private Mode mode;
+
+            public enum Mode {
+                REVIVAL_POST_NODE_REJOIN
+            }
+
+            public FeedsActivateRunnable(List<FeedCollectInfo> feedsToRevive) {
+                this.feedsToRevive = feedsToRevive;
+            }
+
+            public FeedsActivateRunnable() {
+            }
+
+            @Override
+            public void run() {
+                switch (mode) {
+                    case REVIVAL_POST_NODE_REJOIN:
+                        try {
+                            Thread.sleep(10000);
+                        } catch (InterruptedException e1) {
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Attempt to resume feed interrupted");
+                            }
+                            throw new IllegalStateException(e1.getMessage());
+                        }
+                        for (FeedCollectInfo finfo : feedsToRevive) {
+                            try {
+                                JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
+                                if (LOGGER.isLoggable(Level.INFO)) {
+                                    LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
+                                    LOGGER.info("Job:" + finfo.jobSpec);
+                                }
+                            } catch (Exception e) {
+                                if (LOGGER.isLoggable(Level.WARNING)) {
+                                    LOGGER.warning(
+                                            "Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
+                                }
+                            }
+                        }
+                }
+            }
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java
new file mode 100644
index 0000000..b30d8a7
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feed/FeedWorkRequestResponseHandler.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.feed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IClusterManagementWorkResponse;
+import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
+import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
+import org.apache.asterix.external.feed.watch.FeedJobInfo;
+import org.apache.asterix.metadata.cluster.AddNodeWork;
+import org.apache.asterix.metadata.cluster.AddNodeWorkResponse;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedWorkRequestResponseHandler implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedWorkRequestResponseHandler.class.getName());
+
+    private final LinkedBlockingQueue<IClusterManagementWorkResponse> inbox;
+
+    private Map<Integer, Map<String, List<FeedJobInfo>>> feedsWaitingForResponse = new HashMap<Integer, Map<String, List<FeedJobInfo>>>();
+
+    public FeedWorkRequestResponseHandler(LinkedBlockingQueue<IClusterManagementWorkResponse> inbox) {
+        this.inbox = inbox;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            IClusterManagementWorkResponse response = null;
+            try {
+                response = inbox.take();
+            } catch (InterruptedException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Interrupted exception " + e.getMessage());
+                }
+            }
+            IClusterManagementWork submittedWork = response.getWork();
+            Map<String, String> nodeSubstitution = new HashMap<String, String>();
+            switch (submittedWork.getClusterManagementWorkType()) {
+                case ADD_NODE:
+                    AddNodeWork addNodeWork = (AddNodeWork) submittedWork;
+                    int workId = addNodeWork.getWorkId();
+                    Map<String, List<FeedJobInfo>> failureAnalysis = feedsWaitingForResponse.get(workId);
+                    AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
+                    List<String> nodesAdded = resp.getNodesAdded();
+                    List<String> unsubstitutedNodes = new ArrayList<String>();
+                    unsubstitutedNodes.addAll(addNodeWork.getDeadNodes());
+                    int nodeIndex = 0;
+
+                    /** form a mapping between the failed node and its substitute **/
+                    if (nodesAdded != null && nodesAdded.size() > 0) {
+                        for (String failedNodeId : addNodeWork.getDeadNodes()) {
+                            String substitute = nodesAdded.get(nodeIndex);
+                            nodeSubstitution.put(failedNodeId, substitute);
+                            nodeIndex = (nodeIndex + 1) % nodesAdded.size();
+                            unsubstitutedNodes.remove(failedNodeId);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Node " + substitute + " chosen to substiute lost node " + failedNodeId);
+                            }
+                        }
+                    }
+                    if (unsubstitutedNodes.size() > 0) {
+                        String[] participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes()
+                                .toArray(new String[] {});
+                        nodeIndex = 0;
+                        for (String unsubstitutedNode : unsubstitutedNodes) {
+                            nodeSubstitution.put(unsubstitutedNode, participantNodes[nodeIndex]);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Node " + participantNodes[nodeIndex] + " chosen to substiute lost node "
+                                        + unsubstitutedNode);
+                            }
+                            nodeIndex = (nodeIndex + 1) % participantNodes.length;
+                        }
+
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Request " + resp.getWork() + " completed using internal nodes");
+                        }
+                    }
+
+                    // alter failed feed intake jobs
+
+                    for (Entry<String, List<FeedJobInfo>> entry : failureAnalysis.entrySet()) {
+                        String failedNode = entry.getKey();
+                        List<FeedJobInfo> impactedJobInfos = entry.getValue();
+                        for (FeedJobInfo info : impactedJobInfos) {
+                            JobSpecification spec = info.getSpec();
+                            replaceNode(spec, failedNode, nodeSubstitution.get(failedNode));
+                            info.setSpec(spec);
+                        }
+                    }
+
+                    Set<FeedIntakeInfo> revisedIntakeJobs = new HashSet<FeedIntakeInfo>();
+                    Set<FeedConnectJobInfo> revisedConnectJobInfos = new HashSet<FeedConnectJobInfo>();
+
+                    for (List<FeedJobInfo> infos : failureAnalysis.values()) {
+                        for (FeedJobInfo info : infos) {
+                            switch (info.getJobType()) {
+                                case INTAKE:
+                                    revisedIntakeJobs.add((FeedIntakeInfo) info);
+                                    break;
+                                case FEED_CONNECT:
+                                    revisedConnectJobInfos.add((FeedConnectJobInfo) info);
+                                    break;
+                            }
+                        }
+                    }
+
+                    IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+                    try {
+                        for (FeedIntakeInfo info : revisedIntakeJobs) {
+                            hcc.startJob(info.getSpec());
+                        }
+                        Thread.sleep(2000);
+                        for (FeedConnectJobInfo info : revisedConnectJobInfos) {
+                            hcc.startJob(info.getSpec());
+                            Thread.sleep(2000);
+                        }
+                    } catch (Exception e) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to start revised job post failure");
+                        }
+                    }
+
+                    break;
+                case REMOVE_NODE:
+                    throw new IllegalStateException("Invalid work submitted");
+            }
+        }
+    }
+
+    private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
+        Set<Constraint> userConstraints = jobSpec.getUserConstraints();
+        List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
+        List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
+        List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
+        Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
+        Map<OperatorDescriptorId, Map<Integer, String>> newConstraints = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
+        OperatorDescriptorId opId = null;
+        for (Constraint constraint : userConstraints) {
+            LValueConstraintExpression lexpr = constraint.getLValue();
+            ConstraintExpression cexpr = constraint.getRValue();
+            switch (lexpr.getTag()) {
+                case PARTITION_COUNT:
+                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+                    if (modifiedOperators.contains(opId)) {
+                        countConstraintsToReplace.add(constraint);
+                    } else {
+                        List<Constraint> clist = candidateConstraints.get(opId);
+                        if (clist == null) {
+                            clist = new ArrayList<Constraint>();
+                            candidateConstraints.put(opId, clist);
+                        }
+                        clist.add(constraint);
+                    }
+                    break;
+                case PARTITION_LOCATION:
+                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+                    String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+                    if (oldLocation.equals(failedNodeId)) {
+                        locationConstraintsToReplace.add(constraint);
+                        modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
+                        Map<Integer, String> newLocs = newConstraints.get(opId);
+                        if (newLocs == null) {
+                            newLocs = new HashMap<Integer, String>();
+                            newConstraints.put(opId, newLocs);
+                        }
+                        int partition = ((PartitionLocationExpression) lexpr).getPartition();
+                        newLocs.put(partition, replacementNode);
+                    } else {
+                        if (modifiedOperators.contains(opId)) {
+                            locationConstraintsToReplace.add(constraint);
+                            Map<Integer, String> newLocs = newConstraints.get(opId);
+                            if (newLocs == null) {
+                                newLocs = new HashMap<Integer, String>();
+                                newConstraints.put(opId, newLocs);
+                            }
+                            int partition = ((PartitionLocationExpression) lexpr).getPartition();
+                            newLocs.put(partition, oldLocation);
+                        } else {
+                            List<Constraint> clist = candidateConstraints.get(opId);
+                            if (clist == null) {
+                                clist = new ArrayList<Constraint>();
+                                candidateConstraints.put(opId, clist);
+                            }
+                            clist.add(constraint);
+                        }
+                    }
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
+        jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
+
+        for (OperatorDescriptorId mopId : modifiedOperators) {
+            List<Constraint> clist = candidateConstraints.get(mopId);
+            if (clist != null && !clist.isEmpty()) {
+                jobSpec.getUserConstraints().removeAll(clist);
+
+                for (Constraint c : clist) {
+                    if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
+                        ConstraintExpression cexpr = c.getRValue();
+                        int partition = ((PartitionLocationExpression) c.getLValue()).getPartition();
+                        String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+                        newConstraints.get(mopId).put(partition, oldLocation);
+                    }
+                }
+            }
+        }
+
+        for (Entry<OperatorDescriptorId, Map<Integer, String>> entry : newConstraints.entrySet()) {
+            OperatorDescriptorId nopId = entry.getKey();
+            Map<Integer, String> clist = entry.getValue();
+            IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
+            String[] locations = new String[clist.size()];
+            for (int i = 0; i < locations.length; i++) {
+                locations[i] = clist.get(i);
+            }
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations);
+        }
+
+    }
+
+    public void registerFeedWork(int workId, Map<String, List<FeedJobInfo>> impactedJobs) {
+        feedsWaitingForResponse.put(workId, impactedJobs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java
new file mode 100644
index 0000000..1a8a119
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feed/FeedsActivator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.feed;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.api.common.SessionConfig;
+import org.apache.asterix.api.common.SessionConfig.OutputFormat;
+import org.apache.asterix.aql.translator.QueryTranslator;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.feed.management.FeedCollectInfo;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
+import org.apache.asterix.lang.common.statement.DataverseDecl;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.hyracks.api.job.JobId;
+
+public class FeedsActivator implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
+    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
+
+    private List<FeedCollectInfo> feedsToRevive;
+    private Mode mode;
+
+    public enum Mode {
+        REVIVAL_POST_CLUSTER_REBOOT,
+        REVIVAL_POST_NODE_REJOIN
+    }
+
+    public FeedsActivator() {
+        this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT;
+    }
+
+    public FeedsActivator(List<FeedCollectInfo> feedsToRevive) {
+        this.feedsToRevive = feedsToRevive;
+        this.mode = Mode.REVIVAL_POST_NODE_REJOIN;
+    }
+
+    @Override
+    public void run() {
+        switch (mode) {
+            case REVIVAL_POST_CLUSTER_REBOOT:
+                //revivePostClusterReboot();
+                break;
+            case REVIVAL_POST_NODE_REJOIN:
+                try {
+                    Thread.sleep(10000);
+                } catch (InterruptedException e1) {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Attempt to resume feed interrupted");
+                    }
+                    throw new IllegalStateException(e1.getMessage());
+                }
+                for (FeedCollectInfo finfo : feedsToRevive) {
+                    try {
+                        JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
+                            LOGGER.info("Job:" + finfo.jobSpec);
+                        }
+                    } catch (Exception e) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
+                        }
+                    }
+                }
+        }
+    }
+
+    public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
+        PrintWriter writer = new PrintWriter(System.out, true);
+        SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
+        try {
+            DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
+            ConnectFeedStatement stmt = new ConnectFeedStatement(new Identifier(dataverse), new Identifier(feedName),
+                    new Identifier(dataset), feedPolicy, 0);
+            stmt.setForceConnect(true);
+            List<Statement> statements = new ArrayList<Statement>();
+            statements.add(dataverseDecl);
+            statements.add(stmt);
+            QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
+            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
+                    QueryTranslator.ResultDelivery.SYNC);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy "
+                        + feedPolicy + " Exception " + e.getMessage());
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/CentralFeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/CentralFeedManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/CentralFeedManager.java
deleted file mode 100644
index 7326d08..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/CentralFeedManager.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringReader;
-import java.util.List;
-
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.ICentralFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedLoadManager;
-import org.apache.asterix.common.feeds.api.IFeedTrackingManager;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.lang.aql.parser.AQLParserFactory;
-import org.apache.asterix.lang.common.base.IParser;
-import org.apache.asterix.lang.common.base.IParserFactory;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.metadata.feeds.SocketMessageListener;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class CentralFeedManager implements ICentralFeedManager {
-
-    private static final ICentralFeedManager centralFeedManager = new CentralFeedManager();
-    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
-
-    public static ICentralFeedManager getInstance() {
-        return centralFeedManager;
-    }
-
-    private final int port;
-    private final IFeedLoadManager feedLoadManager;
-    private final IFeedTrackingManager feedTrackingManager;
-    private final SocketMessageListener messageListener;
-
-    private CentralFeedManager() {
-        this.port = AsterixAppContextInfo.getInstance().getFeedProperties().getFeedCentralManagerPort();
-        this.feedLoadManager = new FeedLoadManager();
-        this.feedTrackingManager = new FeedTrackingManager();
-        this.messageListener = new SocketMessageListener(port, new FeedMessageReceiver(this));
-    }
-
-    @Override
-    public void start() throws AsterixException {
-        messageListener.start();
-    }
-
-    @Override
-    public void stop() throws AsterixException, IOException {
-        messageListener.stop();
-    }
-
-    public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
-        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-        JobId jobId = hcc.startJob(spec);
-        if (waitForCompletion) {
-            hcc.waitForCompletion(jobId);
-        }
-        return jobId;
-    }
-
-    @Override
-    public IFeedLoadManager getFeedLoadManager() {
-        return feedLoadManager;
-    }
-
-    @Override
-    public IFeedTrackingManager getFeedTrackingManager() {
-        return feedTrackingManager;
-    }
-
-    public static class AQLExecutor {
-
-        private static final PrintWriter out = new PrintWriter(System.out, true);
-        private static final IParserFactory parserFactory = new AQLParserFactory();
-
-        public static void executeAQL(String aql) throws Exception {
-            IParser parser = parserFactory.createParser(new StringReader(aql));
-            List<Statement> statements = parser.parse();
-            SessionConfig pc = new SessionConfig(out, OutputFormat.ADM);
-            QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
-            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                    QueryTranslator.ResultDelivery.SYNC);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedCollectInfo.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedCollectInfo.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedCollectInfo.java
deleted file mode 100644
index aca6ab6..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedCollectInfo.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedCollectInfo extends FeedInfo {
-    public FeedId sourceFeedId;
-    public FeedConnectionId feedConnectionId;
-    public List<String> collectLocations = new ArrayList<String>();
-    public List<String> computeLocations = new ArrayList<String>();
-    public List<String> storageLocations = new ArrayList<String>();
-    public Map<String, String> feedPolicy;
-    public String superFeedManagerHost;
-    public int superFeedManagerPort;
-    public boolean fullyConnected;
-
-    public FeedCollectInfo(FeedId sourceFeedId, FeedConnectionId feedConnectionId, JobSpecification jobSpec,
-            JobId jobId, Map<String, String> feedPolicy) {
-        super(jobSpec, jobId, FeedInfoType.COLLECT);
-        this.sourceFeedId = sourceFeedId;
-        this.feedConnectionId = feedConnectionId;
-        this.feedPolicy = feedPolicy;
-        this.fullyConnected = true;
-    }
-
-    @Override
-    public String toString() {
-        return FeedInfoType.COLLECT + "[" + feedConnectionId + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedInfo.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedInfo.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedInfo.java
deleted file mode 100644
index a247488..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedInfo.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedInfo {
-    public JobSpecification jobSpec;
-    public JobInfo jobInfo;
-    public JobId jobId;
-    public FeedInfoType infoType;
-    public State state;
-
-    public enum State {
-        ACTIVE,
-        INACTIVE
-    }
-
-    public enum FeedInfoType {
-        INTAKE,
-        COLLECT
-    }
-
-    public FeedInfo(JobSpecification jobSpec, JobId jobId, FeedInfoType infoType) {
-        this.jobSpec = jobSpec;
-        this.jobId = jobId;
-        this.infoType = infoType;
-        this.state = State.INACTIVE;
-    }
-
-    @Override
-    public String toString() {
-        return " job id " + jobId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedJobNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedJobNotificationHandler.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedJobNotificationHandler.java
deleted file mode 100644
index aa7388e..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedJobNotificationHandler.java
+++ /dev/null
@@ -1,743 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.rmi.RemoteException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.asterix.api.common.FeedWorkCollection.SubscribeFeedWork;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.feeds.FeedActivity;
-import org.apache.asterix.common.feeds.FeedConnectJobInfo;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedConnectionRequest;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedIntakeInfo;
-import org.apache.asterix.common.feeds.FeedJobInfo;
-import org.apache.asterix.common.feeds.FeedJobInfo.FeedJobState;
-import org.apache.asterix.common.feeds.FeedJobInfo.JobType;
-import org.apache.asterix.common.feeds.FeedJointKey;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.feeds.api.IFeedJoint;
-import org.apache.asterix.common.feeds.api.IFeedJoint.State;
-import org.apache.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.common.feeds.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.common.feeds.message.StorageReportFeedMessage;
-import org.apache.asterix.feeds.FeedLifecycleListener.Message;
-import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
-import org.apache.asterix.metadata.feeds.FeedCollectOperatorDescriptor;
-import org.apache.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import org.apache.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
-import org.apache.asterix.metadata.feeds.FeedWorkManager;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
-
-public class FeedJobNotificationHandler implements Runnable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
-
-    private final LinkedBlockingQueue<Message> inbox;
-    private final Map<FeedConnectionId, List<IFeedLifecycleEventSubscriber>> eventSubscribers;
-
-    private final Map<JobId, FeedJobInfo> jobInfos;
-    private final Map<FeedId, FeedIntakeInfo> intakeJobInfos;
-    private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
-    private final Map<FeedId, List<IFeedJoint>> feedPipeline;
-    private final Map<FeedConnectionId, Pair<IIntakeProgressTracker, Long>> feedIntakeProgressTrackers;
-
-    public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) {
-        this.inbox = inbox;
-        this.jobInfos = new HashMap<JobId, FeedJobInfo>();
-        this.intakeJobInfos = new HashMap<FeedId, FeedIntakeInfo>();
-        this.connectJobInfos = new HashMap<FeedConnectionId, FeedConnectJobInfo>();
-        this.feedPipeline = new HashMap<FeedId, List<IFeedJoint>>();
-        this.eventSubscribers = new HashMap<FeedConnectionId, List<IFeedLifecycleEventSubscriber>>();
-        this.feedIntakeProgressTrackers = new HashMap<FeedConnectionId, Pair<IIntakeProgressTracker, Long>>();
-    }
-
-    @Override
-    public void run() {
-        Message mesg;
-        while (true) {
-            try {
-                mesg = inbox.take();
-                switch (mesg.messageKind) {
-                    case JOB_START:
-                        handleJobStartMessage(mesg);
-                        break;
-                    case JOB_FINISH:
-                        handleJobFinishMessage(mesg);
-                        break;
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-
-        }
-    }
-
-    public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
-            IIntakeProgressTracker feedIntakeProgressTracker) {
-        if (feedIntakeProgressTrackers.get(connectionId) == null) {
-            this.feedIntakeProgressTrackers.put(connectionId, new Pair<IIntakeProgressTracker, Long>(
-                    feedIntakeProgressTracker, 0L));
-        } else {
-            throw new IllegalStateException(" Progress tracker for connection " + connectionId
-                    + " is alreader registered");
-        }
-    }
-
-    public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
-        this.feedIntakeProgressTrackers.remove(connectionId);
-    }
-
-    public void updateTrackingInformation(StorageReportFeedMessage srm) {
-        Pair<IIntakeProgressTracker, Long> p = feedIntakeProgressTrackers.get(srm.getConnectionId());
-        if (p != null && p.second < srm.getLastPersistedTupleIntakeTimestamp()) {
-            p.second = srm.getLastPersistedTupleIntakeTimestamp();
-            p.first.notifyIngestedTupleTimestamp(p.second);
-        }
-    }
-
-    public Collection<FeedIntakeInfo> getFeedIntakeInfos() {
-        return intakeJobInfos.values();
-    }
-
-    public Collection<FeedConnectJobInfo> getFeedConnectInfos() {
-        return connectJobInfos.values();
-    }
-
-    public void registerFeedJoint(IFeedJoint feedJoint) {
-        List<IFeedJoint> feedJointsOnPipeline = feedPipeline.get(feedJoint.getOwnerFeedId());
-        if (feedJointsOnPipeline == null) {
-            feedJointsOnPipeline = new ArrayList<IFeedJoint>();
-            feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
-            feedJointsOnPipeline.add(feedJoint);
-        } else {
-            if (!feedJointsOnPipeline.contains(feedJoint)) {
-                feedJointsOnPipeline.add(feedJoint);
-            } else {
-                throw new IllegalArgumentException("Feed joint " + feedJoint + " already registered");
-            }
-        }
-    }
-
-    public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec) throws HyracksDataException {
-        if (jobInfos.get(jobId) != null) {
-            throw new IllegalStateException("Feed job already registered");
-        }
-
-        List<IFeedJoint> joints = feedPipeline.get(feedId);
-        IFeedJoint intakeJoint = null;
-        for (IFeedJoint joint : joints) {
-            if (joint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) {
-                intakeJoint = joint;
-                break;
-            }
-        }
-
-        if (intakeJoint != null) {
-            FeedIntakeInfo intakeJobInfo = new FeedIntakeInfo(jobId, FeedJobState.CREATED, FeedJobInfo.JobType.INTAKE,
-                    feedId, intakeJoint, jobSpec);
-            intakeJobInfos.put(feedId, intakeJobInfo);
-            jobInfos.put(jobId, intakeJobInfo);
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Registered feed intake [" + jobId + "]" + " for feed " + feedId);
-            }
-        } else {
-            throw new HyracksDataException("Could not register feed intake job [" + jobId + "]" + " for feed  "
-                    + feedId);
-        }
-    }
-
-    public void registerFeedCollectionJob(FeedId sourceFeedId, FeedConnectionId connectionId, JobId jobId,
-            JobSpecification jobSpec, Map<String, String> feedPolicy) {
-        if (jobInfos.get(jobId) != null) {
-            throw new IllegalStateException("Feed job already registered");
-        }
-
-        List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId);
-        FeedConnectionId cid = null;
-        IFeedJoint sourceFeedJoint = null;
-        for (IFeedJoint joint : feedJoints) {
-            cid = joint.getReceiver(connectionId);
-            if (cid != null) {
-                sourceFeedJoint = joint;
-                break;
-            }
-        }
-
-        if (cid != null) {
-            FeedConnectJobInfo cInfo = new FeedConnectJobInfo(jobId, FeedJobState.CREATED, connectionId,
-                    sourceFeedJoint, null, jobSpec, feedPolicy);
-            jobInfos.put(jobId, cInfo);
-            connectJobInfos.put(connectionId, cInfo);
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Registered feed connection [" + jobId + "]" + " for feed " + connectionId);
-            }
-        } else {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Could not register feed collection job [" + jobId + "]" + " for feed connection "
-                        + connectionId);
-            }
-        }
-
-    }
-
-    public void deregisterFeedIntakeJob(JobId jobId) {
-        if (jobInfos.get(jobId) == null) {
-            throw new IllegalStateException(" Feed Intake job not registered ");
-        }
-
-        FeedIntakeInfo info = (FeedIntakeInfo) jobInfos.get(jobId);
-        jobInfos.remove(jobId);
-        intakeJobInfos.remove(info.getFeedId());
-
-        if (!info.getState().equals(FeedJobState.UNDER_RECOVERY)) {
-            List<IFeedJoint> joints = feedPipeline.get(info.getFeedId());
-            joints.remove(info.getIntakeFeedJoint());
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Deregistered feed intake job [" + jobId + "]");
-            }
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Not removing feed joint as intake job is in " + FeedJobState.UNDER_RECOVERY + " state.");
-            }
-        }
-
-    }
-
-    private void handleJobStartMessage(Message message) throws Exception {
-        FeedJobInfo jobInfo = jobInfos.get(message.jobId);
-        switch (jobInfo.getJobType()) {
-            case INTAKE:
-                handleIntakeJobStartMessage((FeedIntakeInfo) jobInfo);
-                break;
-            case FEED_CONNECT:
-                handleCollectJobStartMessage((FeedConnectJobInfo) jobInfo);
-                break;
-        }
-
-    }
-
-    private void handleJobFinishMessage(Message message) throws Exception {
-        FeedJobInfo jobInfo = jobInfos.get(message.jobId);
-        switch (jobInfo.getJobType()) {
-            case INTAKE:
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Intake Job finished for feed intake " + jobInfo.getJobId());
-                }
-                handleFeedIntakeJobFinishMessage((FeedIntakeInfo) jobInfo, message);
-                break;
-            case FEED_CONNECT:
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Collect Job finished for  " + (FeedConnectJobInfo) jobInfo);
-                }
-                handleFeedCollectJobFinishMessage((FeedConnectJobInfo) jobInfo);
-                break;
-        }
-
-    }
-
-    private synchronized void handleIntakeJobStartMessage(FeedIntakeInfo intakeJobInfo) throws Exception {
-        List<OperatorDescriptorId> intakeOperatorIds = new ArrayList<OperatorDescriptorId>();
-        Map<OperatorDescriptorId, IOperatorDescriptor> operators = intakeJobInfo.getSpec().getOperatorMap();
-        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
-            IOperatorDescriptor opDesc = entry.getValue();
-            if (opDesc instanceof FeedIntakeOperatorDescriptor) {
-                intakeOperatorIds.add(opDesc.getOperatorId());
-            }
-        }
-
-        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-        JobInfo info = hcc.getJobInfo(intakeJobInfo.getJobId());
-        List<String> intakeLocations = new ArrayList<String>();
-        for (OperatorDescriptorId intakeOperatorId : intakeOperatorIds) {
-            Map<Integer, String> operatorLocations = info.getOperatorLocations().get(intakeOperatorId);
-            int nOperatorInstances = operatorLocations.size();
-            for (int i = 0; i < nOperatorInstances; i++) {
-                intakeLocations.add(operatorLocations.get(i));
-            }
-        }
-        // intakeLocations is an ordered list; element at position i corresponds to location of i'th instance of operator
-        intakeJobInfo.setIntakeLocation(intakeLocations);
-        intakeJobInfo.getIntakeFeedJoint().setState(State.ACTIVE);
-        intakeJobInfo.setState(FeedJobState.ACTIVE);
-
-        // notify event listeners 
-        notifyFeedEventSubscribers(intakeJobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
-    }
-
-    private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws RemoteException, ACIDException {
-        // set locations of feed sub-operations (intake, compute, store)
-        setLocations(cInfo);
-
-        // activate joints
-        List<IFeedJoint> joints = feedPipeline.get(cInfo.getConnectionId().getFeedId());
-        for (IFeedJoint joint : joints) {
-            if (joint.getProvider().equals(cInfo.getConnectionId())) {
-                joint.setState(State.ACTIVE);
-                if (joint.getType().equals(IFeedJoint.FeedJointType.COMPUTE)) {
-                    cInfo.setComputeFeedJoint(joint);
-                }
-            }
-        }
-        cInfo.setState(FeedJobState.ACTIVE);
-
-        // register activity in metadata
-        registerFeedActivity(cInfo);
-        // notify event listeners
-        notifyFeedEventSubscribers(cInfo, FeedLifecycleEvent.FEED_COLLECT_STARTED);
-    }
-
-    private void notifyFeedEventSubscribers(FeedJobInfo jobInfo, FeedLifecycleEvent event) {
-        JobType jobType = jobInfo.getJobType();
-        List<FeedConnectionId> impactedConnections = new ArrayList<FeedConnectionId>();
-        if (jobType.equals(JobType.INTAKE)) {
-            FeedId feedId = ((FeedIntakeInfo) jobInfo).getFeedId();
-            for (FeedConnectionId connId : eventSubscribers.keySet()) {
-                if (connId.getFeedId().equals(feedId)) {
-                    impactedConnections.add(connId);
-                }
-            }
-        } else {
-            impactedConnections.add(((FeedConnectJobInfo) jobInfo).getConnectionId());
-        }
-
-        for (FeedConnectionId connId : impactedConnections) {
-            List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connId);
-            if (subscribers != null && !subscribers.isEmpty()) {
-                for (IFeedLifecycleEventSubscriber subscriber : subscribers) {
-                    subscriber.handleFeedEvent(event);
-                }
-            }
-        }
-    }
-
-    public synchronized void submitFeedConnectionRequest(IFeedJoint feedJoint, final FeedConnectionRequest request)
-            throws Exception {
-        List<String> locations = null;
-        switch (feedJoint.getType()) {
-            case INTAKE:
-                FeedIntakeInfo intakeInfo = intakeJobInfos.get(feedJoint.getOwnerFeedId());
-                locations = intakeInfo.getIntakeLocation();
-                break;
-            case COMPUTE:
-                FeedConnectionId connectionId = feedJoint.getProvider();
-                FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
-                locations = cInfo.getComputeLocations();
-                break;
-        }
-
-        SubscribeFeedWork work = new SubscribeFeedWork(locations.toArray(new String[] {}), request);
-        FeedWorkManager.INSTANCE.submitWork(work, new SubscribeFeedWork.FeedSubscribeWorkEventListener());
-    }
-
-    public IFeedJoint getSourceFeedJoint(FeedConnectionId connectionId) {
-        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
-        if (cInfo != null) {
-            return cInfo.getSourceFeedJoint();
-        }
-        return null;
-    }
-
-    public Set<FeedConnectionId> getActiveFeedConnections() {
-        Set<FeedConnectionId> activeConnections = new HashSet<FeedConnectionId>();
-        for (FeedConnectJobInfo cInfo : connectJobInfos.values()) {
-            if (cInfo.getState().equals(FeedJobState.ACTIVE)) {
-                activeConnections.add(cInfo.getConnectionId());
-            }
-        }
-        return activeConnections;
-    }
-
-    public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
-        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
-        if (cInfo != null) {
-            return cInfo.getState().equals(FeedJobState.ACTIVE);
-        }
-        return false;
-    }
-
-    public void setJobState(FeedConnectionId connectionId, FeedJobState jobState) {
-        FeedConnectJobInfo connectJobInfo = connectJobInfos.get(connectionId);
-        connectJobInfo.setState(jobState);
-    }
-
-    public FeedJobState getFeedJobState(FeedConnectionId connectionId) {
-        return connectJobInfos.get(connectionId).getState();
-    }
-
-    private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, Message message) throws Exception {
-        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-        JobInfo info = hcc.getJobInfo(message.jobId);
-        JobStatus status = info.getStatus();
-        FeedLifecycleEvent event;
-        event = status.equals(JobStatus.FAILURE) ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
-                : FeedLifecycleEvent.FEED_ENDED;
-
-        // remove feed joints
-        deregisterFeedIntakeJob(message.jobId);
-
-        // notify event listeners 
-        notifyFeedEventSubscribers(intakeInfo, event);
-
-    }
-
-    private void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
-        FeedConnectionId connectionId = cInfo.getConnectionId();
-
-        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-        JobInfo info = hcc.getJobInfo(cInfo.getJobId());
-        JobStatus status = info.getStatus();
-        boolean failure = status != null && status.equals(JobStatus.FAILURE);
-        FeedPolicyAccessor fpa = new FeedPolicyAccessor(cInfo.getFeedPolicy());
-
-        boolean removeJobHistory = !failure;
-        boolean retainSubsription = cInfo.getState().equals(FeedJobState.UNDER_RECOVERY)
-                || (failure && fpa.continueOnHardwareFailure());
-
-        if (!retainSubsription) {
-            IFeedJoint feedJoint = cInfo.getSourceFeedJoint();
-            feedJoint.removeReceiver(connectionId);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
-            }
-            removeFeedJointsPostPipelineTermination(cInfo.getConnectionId());
-        }
-
-        if (removeJobHistory) {
-            connectJobInfos.remove(connectionId);
-            jobInfos.remove(cInfo.getJobId());
-            feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
-        }
-        deregisterFeedActivity(cInfo);
-
-        // notify event listeners 
-        FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_ENDED;
-        notifyFeedEventSubscribers(cInfo, event);
-    }
-
-    private void registerFeedActivity(FeedConnectJobInfo cInfo) {
-        Map<String, String> feedActivityDetails = new HashMap<String, String>();
-
-        if (cInfo.getCollectLocations() != null) {
-            feedActivityDetails.put(FeedActivity.FeedActivityDetails.INTAKE_LOCATIONS,
-                    StringUtils.join(cInfo.getCollectLocations().iterator(), ','));
-        }
-
-        if (cInfo.getComputeLocations() != null) {
-            feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS,
-                    StringUtils.join(cInfo.getComputeLocations().iterator(), ','));
-        }
-
-        if (cInfo.getStorageLocations() != null) {
-            feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS,
-                    StringUtils.join(cInfo.getStorageLocations().iterator(), ','));
-        }
-
-        String policyName = cInfo.getFeedPolicy().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
-        feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, policyName);
-
-        feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_CONNECT_TIMESTAMP, (new Date()).toString());
-        try {
-            FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(), cInfo
-                    .getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(),
-                    feedActivityDetails);
-            CentralFeedManager.getInstance().getFeedLoadManager()
-                    .reportFeedActivity(cInfo.getConnectionId(), feedActivity);
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to register feed activity for " + cInfo + " " + e.getMessage());
-            }
-
-        }
-
-    }
-
-    public void deregisterFeedActivity(FeedConnectJobInfo cInfo) {
-        try {
-            CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(cInfo.getConnectionId());
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to deregister feed activity for " + cInfo + " " + e.getMessage());
-            }
-        }
-    }
-
-    public void removeFeedJointsPostPipelineTermination(FeedConnectionId connectionId) {
-        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
-        List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId());
-
-        IFeedJoint sourceJoint = cInfo.getSourceFeedJoint();
-        List<FeedConnectionId> all = sourceJoint.getReceivers();
-        boolean removeSourceJoint = all.size() < 2;
-        if (removeSourceJoint) {
-            feedJoints.remove(sourceJoint);
-        }
-
-        IFeedJoint computeJoint = cInfo.getComputeFeedJoint();
-        if (computeJoint != null && computeJoint.getReceivers().size() < 2) {
-            feedJoints.remove(computeJoint);
-        }
-    }
-
-    public boolean isRegisteredFeedJob(JobId jobId) {
-        return jobInfos.get(jobId) != null;
-    }
-
-    public List<String> getFeedComputeLocations(FeedId feedId) {
-        List<IFeedJoint> feedJoints = feedPipeline.get(feedId);
-        for (IFeedJoint joint : feedJoints) {
-            if (joint.getFeedJointKey().getFeedId().equals(feedId)) {
-                return connectJobInfos.get(joint.getProvider()).getComputeLocations();
-            }
-        }
-        return null;
-    }
-
-    public List<String> getFeedStorageLocations(FeedConnectionId connectionId) {
-        return connectJobInfos.get(connectionId).getStorageLocations();
-    }
-
-    public List<String> getFeedCollectLocations(FeedConnectionId connectionId) {
-        return connectJobInfos.get(connectionId).getCollectLocations();
-    }
-
-    public List<String> getFeedIntakeLocations(FeedId feedId) {
-        return intakeJobInfos.get(feedId).getIntakeLocation();
-    }
-
-    public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
-        return connectJobInfos.get(connectionId).getJobId();
-    }
-
-    public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
-        List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connectionId);
-        if (subscribers == null) {
-            subscribers = new ArrayList<IFeedLifecycleEventSubscriber>();
-            eventSubscribers.put(connectionId, subscribers);
-        }
-        subscribers.add(subscriber);
-    }
-
-    public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
-        List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connectionId);
-        if (subscribers != null) {
-            subscribers.remove(subscriber);
-        }
-    }
-
-    //============================
-
-    public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
-        List<IFeedJoint> joints = feedPipeline.get(feedJointKey.getFeedId());
-        if (joints != null && !joints.isEmpty()) {
-            for (IFeedJoint joint : joints) {
-                if (joint.getFeedJointKey().equals(feedJointKey)) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
-    public Collection<IFeedJoint> getFeedIntakeJoints() {
-        List<IFeedJoint> intakeFeedPoints = new ArrayList<IFeedJoint>();
-        for (FeedIntakeInfo info : intakeJobInfos.values()) {
-            intakeFeedPoints.add(info.getIntakeFeedJoint());
-        }
-        return intakeFeedPoints;
-    }
-
-    public IFeedJoint getFeedJoint(FeedJointKey feedPointKey) {
-        List<IFeedJoint> joints = feedPipeline.get(feedPointKey.getFeedId());
-        if (joints != null && !joints.isEmpty()) {
-            for (IFeedJoint joint : joints) {
-                if (joint.getFeedJointKey().equals(feedPointKey)) {
-                    return joint;
-                }
-            }
-        }
-        return null;
-    }
-
-    public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
-        IFeedJoint feedJoint = getFeedJoint(feedJointKey);
-        if (feedJoint != null) {
-            return feedJoint;
-        } else {
-            String jointKeyString = feedJointKey.getStringRep();
-            List<IFeedJoint> jointsOnPipeline = feedPipeline.get(feedJointKey.getFeedId());
-            IFeedJoint candidateJoint = null;
-            if (jointsOnPipeline != null) {
-                for (IFeedJoint joint : jointsOnPipeline) {
-                    if (jointKeyString.contains(joint.getFeedJointKey().getStringRep())) {
-                        if (candidateJoint == null) {
-                            candidateJoint = joint;
-                        } else if (joint.getFeedJointKey().getStringRep()
-                                .contains(candidateJoint.getFeedJointKey().getStringRep())) { // found feed point is a super set of the earlier find
-                            candidateJoint = joint;
-                        }
-                    }
-                }
-            }
-            return candidateJoint;
-        }
-    }
-
-    public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
-        return connectJobInfos.get(connectionId).getSpec();
-    }
-
-    public IFeedJoint getFeedPoint(FeedId sourceFeedId, IFeedJoint.FeedJointType type) {
-        List<IFeedJoint> joints = feedPipeline.get(sourceFeedId);
-        for (IFeedJoint joint : joints) {
-            if (joint.getType().equals(type)) {
-                return joint;
-            }
-        }
-        return null;
-    }
-
-    public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) {
-        return connectJobInfos.get(connectionId);
-    }
-
-    private void setLocations(FeedConnectJobInfo cInfo) {
-        JobSpecification jobSpec = cInfo.getSpec();
-
-        List<OperatorDescriptorId> collectOperatorIds = new ArrayList<OperatorDescriptorId>();
-        List<OperatorDescriptorId> computeOperatorIds = new ArrayList<OperatorDescriptorId>();
-        List<OperatorDescriptorId> storageOperatorIds = new ArrayList<OperatorDescriptorId>();
-
-        Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
-        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
-            IOperatorDescriptor opDesc = entry.getValue();
-            IOperatorDescriptor actualOp = null;
-            if (opDesc instanceof FeedMetaOperatorDescriptor) {
-                actualOp = ((FeedMetaOperatorDescriptor) opDesc).getCoreOperator();
-            } else {
-                actualOp = opDesc;
-            }
-
-            if (actualOp instanceof AlgebricksMetaOperatorDescriptor) {
-                AlgebricksMetaOperatorDescriptor op = ((AlgebricksMetaOperatorDescriptor) actualOp);
-                IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories();
-                boolean computeOp = false;
-                for (IPushRuntimeFactory rf : runtimeFactories) {
-                    if (rf instanceof AssignRuntimeFactory) {
-                        IConnectorDescriptor connDesc = jobSpec.getOperatorInputMap().get(op.getOperatorId()).get(0);
-                        IOperatorDescriptor sourceOp = jobSpec.getConnectorOperatorMap().get(connDesc.getConnectorId())
-                                .getLeft().getLeft();
-                        if (sourceOp instanceof FeedCollectOperatorDescriptor) {
-                            computeOp = true;
-                            break;
-                        }
-                    }
-                }
-                if (computeOp) {
-                    computeOperatorIds.add(entry.getKey());
-                }
-            } else if (actualOp instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
-                storageOperatorIds.add(entry.getKey());
-            } else if (actualOp instanceof FeedCollectOperatorDescriptor) {
-                collectOperatorIds.add(entry.getKey());
-            }
-        }
-
-        try {
-            IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-            JobInfo info = hcc.getJobInfo(cInfo.getJobId());
-            List<String> collectLocations = new ArrayList<String>();
-            for (OperatorDescriptorId collectOpId : collectOperatorIds) {
-                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(collectOpId);
-                int nOperatorInstances = operatorLocations.size();
-                for (int i = 0; i < nOperatorInstances; i++) {
-                    collectLocations.add(operatorLocations.get(i));
-                }
-            }
-
-            List<String> computeLocations = new ArrayList<String>();
-            for (OperatorDescriptorId computeOpId : computeOperatorIds) {
-                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(computeOpId);
-                if (operatorLocations != null) {
-                    int nOperatorInstances = operatorLocations.size();
-                    for (int i = 0; i < nOperatorInstances; i++) {
-                        computeLocations.add(operatorLocations.get(i));
-                    }
-                } else {
-                    computeLocations.clear();
-                    computeLocations.addAll(collectLocations);
-                }
-            }
-
-            List<String> storageLocations = new ArrayList<String>();
-            for (OperatorDescriptorId storageOpId : storageOperatorIds) {
-                Map<Integer, String> operatorLocations = info.getOperatorLocations().get(storageOpId);
-                if (operatorLocations == null) {
-                    continue;
-                }
-                int nOperatorInstances = operatorLocations.size();
-                for (int i = 0; i < nOperatorInstances; i++) {
-                    storageLocations.add(operatorLocations.get(i));
-                }
-            }
-            cInfo.setCollectLocations(collectLocations);
-            cInfo.setComputeLocations(computeLocations);
-            cInfo.setStorageLocations(storageLocations);
-
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedJoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedJoint.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedJoint.java
deleted file mode 100644
index a76a1e9..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedJoint.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedConnectionRequest;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedJointKey;
-import org.apache.asterix.common.feeds.api.IFeedJoint;
-import org.apache.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
-
-public class FeedJoint implements IFeedJoint {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedJoint.class.getName());
-
-    /** A unique key associated with the feed point **/
-    private final FeedJointKey key;
-
-    /** The state associated with the FeedJoint **/
-    private State state;
-
-    /** A list of subscribers that receive data from this FeedJoint **/
-    private final List<FeedConnectionId> receivers;
-
-    /** The feedId on which the feedPoint resides **/
-    private final FeedId ownerFeedId;
-
-    /** A list of feed subscription requests submitted for subscribing to the FeedPoint's data **/
-    private final List<FeedConnectionRequest> connectionRequests;
-
-    private final ConnectionLocation connectionLocation;
-
-    private final FeedJointType type;
-
-    private FeedConnectionId provider;
-
-    public FeedJoint(FeedJointKey key, FeedId ownerFeedId, ConnectionLocation subscriptionLocation, FeedJointType type,
-            FeedConnectionId provider) {
-        this.key = key;
-        this.ownerFeedId = ownerFeedId;
-        this.type = type;
-        this.receivers = new ArrayList<FeedConnectionId>();
-        this.state = State.CREATED;
-        this.connectionLocation = subscriptionLocation;
-        this.connectionRequests = new ArrayList<FeedConnectionRequest>();
-        this.provider = provider;
-    }
-
-    @Override
-    public int hashCode() {
-        return key.hashCode();
-    }
-
-    public void addReceiver(FeedConnectionId connectionId) {
-        receivers.add(connectionId);
-    }
-
-    public void removeReceiver(FeedConnectionId connectionId) {
-        receivers.remove(connectionId);
-    }
-
-    public synchronized void addConnectionRequest(FeedConnectionRequest request) {
-        connectionRequests.add(request);
-        if (state.equals(State.ACTIVE)) {
-            handlePendingConnectionRequest();
-        }
-    }
-
-    public synchronized void setState(State state) {
-        if (this.state.equals(state)) {
-            return;
-        }
-        this.state = state;
-        if (this.state.equals(State.ACTIVE)) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Feed joint " + this + " is now " + State.ACTIVE);
-            }
-            handlePendingConnectionRequest();
-        }
-    }
-
-    private void handlePendingConnectionRequest() {
-        for (FeedConnectionRequest connectionRequest : connectionRequests) {
-            FeedConnectionId connectionId = new FeedConnectionId(connectionRequest.getReceivingFeedId(),
-                    connectionRequest.getTargetDataset());
-            try {
-                FeedLifecycleListener.INSTANCE.submitFeedConnectionRequest(this, connectionRequest);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Submitted feed connection request " + connectionRequest + " at feed joint " + this);
-                }
-                addReceiver(connectionId);
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unsuccessful attempt at submitting connection request " + connectionRequest
-                            + " at feed joint " + this + ". Message " + e.getMessage());
-                }
-                e.printStackTrace();
-            }
-        }
-        connectionRequests.clear();
-    }
-
-    public FeedConnectionId getReceiver(FeedConnectionId connectionId) {
-        for (FeedConnectionId cid : receivers) {
-            if (cid.equals(connectionId)) {
-                return cid;
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public String toString() {
-        return key.toString() + " [" + connectionLocation + "]" + "[" + state + "]";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null) {
-            return false;
-        }
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof FeedJoint)) {
-            return false;
-        }
-        return ((FeedJoint) o).getFeedJointKey().equals(this.key);
-    }
-
-    public FeedId getOwnerFeedId() {
-        return ownerFeedId;
-    }
-
-    public List<FeedConnectionRequest> getConnectionRequests() {
-        return connectionRequests;
-    }
-
-    public ConnectionLocation getConnectionLocation() {
-        return connectionLocation;
-    }
-
-    public FeedJointType getType() {
-        return type;
-    }
-
-    @Override
-    public FeedConnectionId getProvider() {
-        return provider;
-    }
-
-    public List<FeedConnectionId> getReceivers() {
-        return receivers;
-    }
-
-    public FeedJointKey getKey() {
-        return key;
-    }
-
-    public synchronized State getState() {
-        return state;
-    }
-
-    @Override
-    public FeedJointKey getFeedJointKey() {
-        return key;
-    }
-
-}