You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:18 UTC

[21/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMetadataManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMetadataManager.java
new file mode 100644
index 0000000..bd0c09b
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedMetadataManager.java
@@ -0,0 +1,93 @@
+package edu.uci.ics.asterix.feeds;
+
+import java.util.Date;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetadataManager;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedBootstrap;
+import edu.uci.ics.asterix.metadata.feeds.XAQLFeedMessage;
+import edu.uci.ics.asterix.om.base.ARecord;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedMetadataManager implements IFeedMetadataManager {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMetadataManager.class.getName());
+
+    private final String nodeId;
+    private ARecordType recordType;
+
+    public FeedMetadataManager(String nodeId) throws AsterixException, HyracksDataException {
+        this.nodeId = nodeId;
+        String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple", "message",
+                "timestamp" };
+        IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
+
+        recordType = new ARecordType(FeedBootstrap.FAILED_TUPLE_DATASET_TYPE, fieldNames, fieldTypes, true);
+    }
+
+    @Override
+    public void logTuple(FeedConnectionId connectionId, String tuple, String message, IFeedManager feedManager)
+            throws AsterixException {
+        try {
+            AString id = new AString("1");
+            AString dataverseValue = new AString(connectionId.getFeedId().getDataverse());
+            AString feedValue = new AString(connectionId.getFeedId().getFeedName());
+            AString targetDatasetValue = new AString(connectionId.getDatasetName());
+            AString tupleValue = new AString(tuple);
+            AString messageValue = new AString(message);
+            AString dateTime = new AString(new Date().toString());
+
+            IAObject[] fields = new IAObject[] { id, dataverseValue, feedValue, targetDatasetValue, tupleValue,
+                    messageValue, dateTime };
+            ARecord record = new ARecord(recordType, fields);
+            StringBuilder builder = new StringBuilder();
+            builder.append("use dataverse " + FeedBootstrap.FEEDS_METADATA_DV + ";" + "\n");
+            builder.append("insert into dataset " + FeedBootstrap.FAILED_TUPLE_DATASET + " ");
+            builder.append(" (" + recordToString(record) + ")");
+            builder.append(";");
+
+            XAQLFeedMessage xAqlMessage = new XAQLFeedMessage(connectionId, builder.toString());
+            feedManager.getFeedMessageService().sendMessage(xAqlMessage);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(" Sent " + xAqlMessage.toJSON());
+            }
+        } catch (Exception pe) {
+            throw new AsterixException(pe);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "FeedMetadataManager [" + nodeId + "]";
+    }
+
+    private String recordToString(ARecord record) {
+        String[] fieldNames = record.getType().getFieldNames();
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ ");
+        for (int i = 0; i < fieldNames.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append("\"" + fieldNames[i] + "\"");
+            sb.append(": ");
+            switch (record.getType().getFieldTypes()[i].getTypeTag()) {
+                case STRING:
+                    sb.append("\"" + ((AString) record.getValueByPos(i)).getStringValue() + "\"");
+                    break;
+            }
+        }
+        sb.append(" }");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedTrackingManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedTrackingManager.java
new file mode 100644
index 0000000..c55c132
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedTrackingManager.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitAckMessage;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitResponseMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.file.FeedOperations;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedTrackingManager implements IFeedTrackingManager {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedTrackingManager.class.getName());
+
+    private final BitSet allOnes;
+
+    private Map<FeedConnectionId, Map<AckId, BitSet>> ackHistory;
+    private Map<FeedConnectionId, Map<AckId, Integer>> maxBaseAcked;
+
+    public FeedTrackingManager() {
+        byte[] allOneBytes = new byte[128];
+        Arrays.fill(allOneBytes, (byte) 0xff);
+        allOnes = BitSet.valueOf(allOneBytes);
+        ackHistory = new HashMap<FeedConnectionId, Map<AckId, BitSet>>();
+        maxBaseAcked = new HashMap<FeedConnectionId, Map<AckId, Integer>>();
+    }
+
+    @Override
+    public synchronized void submitAckReport(FeedTupleCommitAckMessage ackMessage) {
+        AckId ackId = getAckId(ackMessage);
+        Map<AckId, BitSet> acksForConnection = ackHistory.get(ackMessage.getConnectionId());
+        if (acksForConnection == null) {
+            acksForConnection = new HashMap<AckId, BitSet>();
+            acksForConnection.put(ackId, BitSet.valueOf(ackMessage.getCommitAcks()));
+            ackHistory.put(ackMessage.getConnectionId(), acksForConnection);
+        }
+        BitSet currentAcks = acksForConnection.get(ackId);
+        if (currentAcks == null) {
+            currentAcks = BitSet.valueOf(ackMessage.getCommitAcks());
+            acksForConnection.put(ackId, currentAcks);
+        } else {
+            currentAcks.or(BitSet.valueOf(ackMessage.getCommitAcks()));
+        }
+        if (Arrays.equals(currentAcks.toByteArray(), allOnes.toByteArray())) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(ackMessage.getIntakePartition() + " (" + ackMessage.getBase() + ")" + " is convered");
+            }
+            Map<AckId, Integer> maxBaseAckedForConnection = maxBaseAcked.get(ackMessage.getConnectionId());
+            if (maxBaseAckedForConnection == null) {
+                maxBaseAckedForConnection = new HashMap<AckId, Integer>();
+                maxBaseAcked.put(ackMessage.getConnectionId(), maxBaseAckedForConnection);
+            }
+            Integer maxBaseAckedValue = maxBaseAckedForConnection.get(ackId);
+            if (maxBaseAckedValue == null) {
+                maxBaseAckedValue = ackMessage.getBase();
+                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
+                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
+                        ackMessage.getBase());
+            } else if (ackMessage.getBase() == maxBaseAckedValue + 1) {
+                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
+                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
+                        ackMessage.getBase());
+            } else {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Ignoring discountiuous acked base " + ackMessage.getBase() + " for " + ackId);
+                }
+            }
+
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("AckId " + ackId + " pending number of acks " + (128 * 8 - currentAcks.cardinality()));
+            }
+        }
+    }
+
+    public synchronized void disableTracking(FeedConnectionId connectionId) {
+        ackHistory.remove(connectionId);
+        maxBaseAcked.remove(connectionId);
+    }
+
+    private void sendCommitResponseMessage(FeedConnectionId connectionId, int partition, int base) {
+        FeedTupleCommitResponseMessage response = new FeedTupleCommitResponseMessage(connectionId, partition, base);
+        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+        String collectLocation = collectLocations.get(partition);
+        Set<String> messageDestinations = new HashSet<String>();
+        messageDestinations.add(collectLocation);
+        messageDestinations.addAll(storageLocations);
+        try {
+            JobSpecification spec = FeedOperations.buildCommitAckResponseJob(response, messageDestinations);
+            CentralFeedManager.runJob(spec, false);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to send commit response message " + response + " exception " + e.getMessage());
+            }
+        }
+    }
+
+    private static AckId getAckId(FeedTupleCommitAckMessage ackMessage) {
+        return new AckId(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), ackMessage.getBase());
+    }
+
+    private static class AckId {
+        private FeedConnectionId connectionId;
+        private int intakePartition;
+        private int base;
+
+        public AckId(FeedConnectionId connectionId, int intakePartition, int base) {
+            this.connectionId = connectionId;
+            this.intakePartition = intakePartition;
+            this.base = base;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof AckId)) {
+                return false;
+            }
+            AckId other = (AckId) o;
+            return other.getConnectionId().equals(connectionId) && other.getIntakePartition() == intakePartition
+                    && other.getBase() == base;
+        }
+
+        @Override
+        public String toString() {
+            return connectionId + "[" + intakePartition + "]" + "(" + base + ")";
+        }
+
+        @Override
+        public int hashCode() {
+            return toString().hashCode();
+        }
+
+        public FeedConnectionId getConnectionId() {
+            return connectionId;
+        }
+
+        public int getIntakePartition() {
+            return intakePartition;
+        }
+
+        public int getBase() {
+            return base;
+        }
+
+    }
+
+    @Override
+    public void disableAcking(FeedConnectionId connectionId) {
+        ackHistory.remove(connectionId);
+        maxBaseAcked.remove(connectionId);
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("Acking disabled for " + connectionId);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedWorkRequestResponseHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedWorkRequestResponseHandler.java
new file mode 100644
index 0000000..834784c
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedWorkRequestResponseHandler.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.common.feeds.FeedConnectJobInfo;
+import edu.uci.ics.asterix.common.feeds.FeedIntakeInfo;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedWorkRequestResponseHandler implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedWorkRequestResponseHandler.class.getName());
+
+    private final LinkedBlockingQueue<IClusterManagementWorkResponse> inbox;
+
+    private Map<Integer, Map<String, List<FeedJobInfo>>> feedsWaitingForResponse = new HashMap<Integer, Map<String, List<FeedJobInfo>>>();
+
+    public FeedWorkRequestResponseHandler(LinkedBlockingQueue<IClusterManagementWorkResponse> inbox) {
+        this.inbox = inbox;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            IClusterManagementWorkResponse response = null;
+            try {
+                response = inbox.take();
+            } catch (InterruptedException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Interrupted exception " + e.getMessage());
+                }
+            }
+            IClusterManagementWork submittedWork = response.getWork();
+            Map<String, String> nodeSubstitution = new HashMap<String, String>();
+            switch (submittedWork.getClusterManagementWorkType()) {
+                case ADD_NODE:
+                    AddNodeWork addNodeWork = (AddNodeWork) submittedWork;
+                    int workId = addNodeWork.getWorkId();
+                    Map<String, List<FeedJobInfo>> failureAnalysis = feedsWaitingForResponse.get(workId);
+                    AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
+                    List<String> nodesAdded = resp.getNodesAdded();
+                    List<String> unsubstitutedNodes = new ArrayList<String>();
+                    unsubstitutedNodes.addAll(addNodeWork.getDeadNodes());
+                    int nodeIndex = 0;
+
+                    /** form a mapping between the failed node and its substitute **/
+                    if (nodesAdded != null && nodesAdded.size() > 0) {
+                        for (String failedNodeId : addNodeWork.getDeadNodes()) {
+                            String substitute = nodesAdded.get(nodeIndex);
+                            nodeSubstitution.put(failedNodeId, substitute);
+                            nodeIndex = (nodeIndex + 1) % nodesAdded.size();
+                            unsubstitutedNodes.remove(failedNodeId);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Node " + substitute + " chosen to substiute lost node " + failedNodeId);
+                            }
+                        }
+                    }
+                    if (unsubstitutedNodes.size() > 0) {
+                        String[] participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(
+                                new String[] {});
+                        nodeIndex = 0;
+                        for (String unsubstitutedNode : unsubstitutedNodes) {
+                            nodeSubstitution.put(unsubstitutedNode, participantNodes[nodeIndex]);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Node " + participantNodes[nodeIndex] + " chosen to substiute lost node "
+                                        + unsubstitutedNode);
+                            }
+                            nodeIndex = (nodeIndex + 1) % participantNodes.length;
+                        }
+
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Request " + resp.getWork() + " completed using internal nodes");
+                        }
+                    }
+
+                    // alter failed feed intake jobs
+
+                    for (Entry<String, List<FeedJobInfo>> entry : failureAnalysis.entrySet()) {
+                        String failedNode = entry.getKey();
+                        List<FeedJobInfo> impactedJobInfos = entry.getValue();
+                        for (FeedJobInfo info : impactedJobInfos) {
+                            JobSpecification spec = info.getSpec();
+                            replaceNode(spec, failedNode, nodeSubstitution.get(failedNode));
+                            info.setSpec(spec);
+                        }
+                    }
+
+                    Set<FeedIntakeInfo> revisedIntakeJobs = new HashSet<FeedIntakeInfo>();
+                    Set<FeedConnectJobInfo> revisedConnectJobInfos = new HashSet<FeedConnectJobInfo>();
+
+                    for (List<FeedJobInfo> infos : failureAnalysis.values()) {
+                        for (FeedJobInfo info : infos) {
+                            switch (info.getJobType()) {
+                                case INTAKE:
+                                    revisedIntakeJobs.add((FeedIntakeInfo) info);
+                                    break;
+                                case FEED_CONNECT:
+                                    revisedConnectJobInfos.add((FeedConnectJobInfo) info);
+                                    break;
+                            }
+                        }
+                    }
+
+                    IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+                    try {
+                        for (FeedIntakeInfo info : revisedIntakeJobs) {
+                            hcc.startJob(info.getSpec());
+                        }
+                        Thread.sleep(2000);
+                        for (FeedConnectJobInfo info : revisedConnectJobInfos) {
+                            hcc.startJob(info.getSpec());
+                            Thread.sleep(2000);
+                        }
+                    } catch (Exception e) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to start revised job post failure");
+                        }
+                    }
+
+                    break;
+                case REMOVE_NODE:
+                    throw new IllegalStateException("Invalid work submitted");
+            }
+        }
+    }
+
+    private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
+        Set<Constraint> userConstraints = jobSpec.getUserConstraints();
+        List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
+        List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
+        List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
+        Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
+        Map<OperatorDescriptorId, Map<Integer, String>> newConstraints = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
+        OperatorDescriptorId opId = null;
+        for (Constraint constraint : userConstraints) {
+            LValueConstraintExpression lexpr = constraint.getLValue();
+            ConstraintExpression cexpr = constraint.getRValue();
+            switch (lexpr.getTag()) {
+                case PARTITION_COUNT:
+                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+                    if (modifiedOperators.contains(opId)) {
+                        countConstraintsToReplace.add(constraint);
+                    } else {
+                        List<Constraint> clist = candidateConstraints.get(opId);
+                        if (clist == null) {
+                            clist = new ArrayList<Constraint>();
+                            candidateConstraints.put(opId, clist);
+                        }
+                        clist.add(constraint);
+                    }
+                    break;
+                case PARTITION_LOCATION:
+                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+                    String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+                    if (oldLocation.equals(failedNodeId)) {
+                        locationConstraintsToReplace.add(constraint);
+                        modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
+                        Map<Integer, String> newLocs = newConstraints.get(opId);
+                        if (newLocs == null) {
+                            newLocs = new HashMap<Integer, String>();
+                            newConstraints.put(opId, newLocs);
+                        }
+                        int partition = ((PartitionLocationExpression) lexpr).getPartition();
+                        newLocs.put(partition, replacementNode);
+                    } else {
+                        if (modifiedOperators.contains(opId)) {
+                            locationConstraintsToReplace.add(constraint);
+                            Map<Integer, String> newLocs = newConstraints.get(opId);
+                            if (newLocs == null) {
+                                newLocs = new HashMap<Integer, String>();
+                                newConstraints.put(opId, newLocs);
+                            }
+                            int partition = ((PartitionLocationExpression) lexpr).getPartition();
+                            newLocs.put(partition, oldLocation);
+                        } else {
+                            List<Constraint> clist = candidateConstraints.get(opId);
+                            if (clist == null) {
+                                clist = new ArrayList<Constraint>();
+                                candidateConstraints.put(opId, clist);
+                            }
+                            clist.add(constraint);
+                        }
+                    }
+                    break;
+            }
+        }
+
+        jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
+        jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
+
+        for (OperatorDescriptorId mopId : modifiedOperators) {
+            List<Constraint> clist = candidateConstraints.get(mopId);
+            if (clist != null && !clist.isEmpty()) {
+                jobSpec.getUserConstraints().removeAll(clist);
+
+                for (Constraint c : clist) {
+                    if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
+                        ConstraintExpression cexpr = c.getRValue();
+                        int partition = ((PartitionLocationExpression) c.getLValue()).getPartition();
+                        String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+                        newConstraints.get(mopId).put(partition, oldLocation);
+                    }
+                }
+            }
+        }
+
+        for (Entry<OperatorDescriptorId, Map<Integer, String>> entry : newConstraints.entrySet()) {
+            OperatorDescriptorId nopId = entry.getKey();
+            Map<Integer, String> clist = entry.getValue();
+            IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
+            String[] locations = new String[clist.size()];
+            for (int i = 0; i < locations.length; i++) {
+                locations[i] = clist.get(i);
+            }
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations);
+        }
+
+    }
+
+    public void registerFeedWork(int workId, Map<String, List<FeedJobInfo>> impactedJobs) {
+        feedsWaitingForResponse.put(workId, impactedJobs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedsActivator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedsActivator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedsActivator.java
new file mode 100644
index 0000000..4b5c8ef
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/feeds/FeedsActivator.java
@@ -0,0 +1,95 @@
+package edu.uci.ics.asterix.feeds;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
+import edu.uci.ics.asterix.aql.expression.DataverseDecl;
+import edu.uci.ics.asterix.aql.expression.Identifier;
+import edu.uci.ics.asterix.aql.translator.AqlTranslator;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class FeedsActivator implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
+
+    private List<FeedCollectInfo> feedsToRevive;
+    private Mode mode;
+
+    public enum Mode {
+        REVIVAL_POST_CLUSTER_REBOOT,
+        REVIVAL_POST_NODE_REJOIN
+    }
+
+    public FeedsActivator() {
+        this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT;
+    }
+
+    public FeedsActivator(List<FeedCollectInfo> feedsToRevive) {
+        this.feedsToRevive = feedsToRevive;
+        this.mode = Mode.REVIVAL_POST_NODE_REJOIN;
+    }
+
+    @Override
+    public void run() {
+        switch (mode) {
+            case REVIVAL_POST_CLUSTER_REBOOT:
+                //revivePostClusterReboot();
+                break;
+            case REVIVAL_POST_NODE_REJOIN:
+                try {
+                    Thread.sleep(10000);
+                } catch (InterruptedException e1) {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Attempt to resume feed interrupted");
+                    }
+                    throw new IllegalStateException(e1.getMessage());
+                }
+                for (FeedCollectInfo finfo : feedsToRevive) {
+                    try {
+                        JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
+                            LOGGER.info("Job:" + finfo.jobSpec);
+                        }
+                    } catch (Exception e) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
+                        }
+                    }
+                }
+        }
+    }
+
+    public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
+        PrintWriter writer = new PrintWriter(System.out, true);
+        SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
+        try {
+            DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
+            ConnectFeedStatement stmt = new ConnectFeedStatement(new Identifier(dataverse), new Identifier(feedName),
+                    new Identifier(dataset), feedPolicy, 0);
+            stmt.setForceConnect(true);
+            List<Statement> statements = new ArrayList<Statement>();
+            statements.add(dataverseDecl);
+            statements.add(stmt);
+            AqlTranslator translator = new AqlTranslator(statements, pc);
+            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, AqlTranslator.ResultDelivery.SYNC);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy "
+                        + feedPolicy + " Exception " + e.getMessage());
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index 5e52f3e..28ee1f2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -14,15 +14,34 @@
  */
 package edu.uci.ics.asterix.file;
 
-import java.util.logging.Logger;
+import java.util.Collection;
+import java.util.List;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedConnectJobInfo;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitResponseMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedJoint;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.message.EndFeedMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.PrepareStallMessage;
+import edu.uci.ics.asterix.metadata.feeds.TerminateDataFlowMessage;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -33,44 +52,200 @@ import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
  */
 public class FeedOperations {
 
-    private static final Logger LOGGER = Logger.getLogger(IndexOperations.class.getName());
-
     /**
-     * @param controlFeedStatement
-     *            The statement representing the action that describes the
-     *            action that needs to be taken on the feed. E.g. of actions are
-     *            stop feed or alter feed.
+     * Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor.
+     * 
+     * @param primaryFeed
      * @param metadataProvider
-     *            An instance of the MetadataProvider
-     * @return An instance of JobSpec for the job that would send an appropriate
-     *         control message to the running feed.
-     * @throws AsterixException
-     * @throws AlgebricksException
+     * @return JobSpecification the Hyracks job specification for receiving data from external source
+     * @throws Exception
      */
-    public static JobSpecification buildDisconnectFeedJobSpec(String dataverseName, String feedName,
-            String datasetName, AqlMetadataProvider metadataProvider, FeedActivity feedActivity)
-            throws AsterixException, AlgebricksException {
+    public static Pair<JobSpecification, IFeedAdapterFactory> buildFeedIntakeJobSpec(PrimaryFeed primaryFeed,
+            AqlMetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
 
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IOperatorDescriptor feedMessenger;
-        AlgebricksPartitionConstraint messengerPc;
+        spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE);
+        IFeedAdapterFactory adapterFactory = null;
+        IOperatorDescriptor feedIngestor;
+        AlgebricksPartitionConstraint ingesterPc;
 
         try {
-            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = metadataProvider
-                    .buildDisconnectFeedMessengerRuntime(spec, dataverseName, feedName, datasetName, feedActivity);
-            feedMessenger = p.first;
-            messengerPc = p.second;
+            Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IFeedAdapterFactory> t = metadataProvider
+                    .buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor);
+            feedIngestor = t.first;
+            ingesterPc = t.second;
+            adapterFactory = t.third;
         } catch (AlgebricksException e) {
+            e.printStackTrace();
             throw new AsterixException(e);
         }
 
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc);
 
         NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc);
+        spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0);
+        spec.addRoot(nullSink);
+        return new Pair<JobSpecification, IFeedAdapterFactory>(spec, adapterFactory);
+    }
+
+    public static JobSpecification buildDiscontinueFeedSourceSpec(AqlMetadataProvider metadataProvider, FeedId feedId)
+            throws AsterixException, AlgebricksException {
+
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IOperatorDescriptor feedMessenger = null;
+        AlgebricksPartitionConstraint messengerPc = null;
+
+        List<String> locations = FeedLifecycleListener.INSTANCE.getIntakeLocations(feedId);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDiscontinueFeedMessengerRuntime(spec, feedId,
+                locations);
+
+        feedMessenger = p.first;
+        messengerPc = p.second;
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
         spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
         spec.addRoot(nullSink);
+
         return spec;
+    }
+
+    /**
+     * Builds the job spec for sending message to an active feed to disconnect it from the
+     * its source.
+     */
+    public static Pair<JobSpecification, Boolean> buildDisconnectFeedJobSpec(AqlMetadataProvider metadataProvider,
+            FeedConnectionId connectionId) throws AsterixException, AlgebricksException {
+
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IOperatorDescriptor feedMessenger;
+        AlgebricksPartitionConstraint messengerPc;
+        List<String> locations = null;
+        FeedRuntimeType sourceRuntimeType;
+        try {
+            FeedConnectJobInfo cInfo = FeedLifecycleListener.INSTANCE.getFeedConnectJobInfo(connectionId);
+            IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint();
+            IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint();
+
+            boolean terminateIntakeJob = false;
+            boolean completeDisconnect = computeFeedJoint == null || computeFeedJoint.getReceivers().isEmpty();
+            if (completeDisconnect) {
+                sourceRuntimeType = FeedRuntimeType.INTAKE;
+                locations = cInfo.getCollectLocations();
+                terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 1;
+            } else {
+                locations = cInfo.getComputeLocations();
+                sourceRuntimeType = FeedRuntimeType.COMPUTE;
+            }
+
+            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDisconnectFeedMessengerRuntime(spec,
+                    connectionId, locations, sourceRuntimeType, completeDisconnect, sourceFeedJoint.getOwnerFeedId());
+
+            feedMessenger = p.first;
+            messengerPc = p.second;
+
+            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+            NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
+            spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
+            spec.addRoot(nullSink);
+            return new Pair<JobSpecification, Boolean>(spec, terminateIntakeJob);
+
+        } catch (AlgebricksException e) {
+            throw new AsterixException(e);
+        }
+
+    }
+
+    public static JobSpecification buildPrepareStallMessageJob(PrepareStallMessage stallMessage,
+            Collection<String> collectLocations) throws AsterixException {
+        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+        try {
+            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+                    messageJobSpec, stallMessage.getConnectionId(), stallMessage, collectLocations);
+            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+        } catch (AlgebricksException ae) {
+            throw new AsterixException(ae);
+        }
+        return messageJobSpec;
+    }
+
+    public static JobSpecification buildNotifyThrottlingEnabledMessageJob(
+            ThrottlingEnabledFeedMessage throttlingEnabledMesg, Collection<String> locations) throws AsterixException {
+        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+        try {
+            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+                    messageJobSpec, throttlingEnabledMesg.getConnectionId(), throttlingEnabledMesg, locations);
+            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+        } catch (AlgebricksException ae) {
+            throw new AsterixException(ae);
+        }
+        return messageJobSpec;
+    }
+
+    public static JobSpecification buildTerminateFlowMessageJob(TerminateDataFlowMessage terminateMessage,
+            List<String> collectLocations) throws AsterixException {
+        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+        try {
+            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+                    messageJobSpec, terminateMessage.getConnectionId(), terminateMessage, collectLocations);
+            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+        } catch (AlgebricksException ae) {
+            throw new AsterixException(ae);
+        }
+        return messageJobSpec;
+    }
+
+    public static JobSpecification buildCommitAckResponseJob(FeedTupleCommitResponseMessage commitResponseMessage,
+            Collection<String> targetLocations) throws AsterixException {
+        JobSpecification messageJobSpec = JobSpecificationUtils.createJobSpecification();
+        try {
+            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = FeedOperations.buildSendFeedMessageRuntime(
+                    messageJobSpec, commitResponseMessage.getConnectionId(), commitResponseMessage, targetLocations);
+            buildSendFeedMessageJobSpec(p.first, p.second, messageJobSpec);
+        } catch (AlgebricksException ae) {
+            throw new AsterixException(ae);
+        }
+        return messageJobSpec;
+    }
+
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDiscontinueFeedMessengerRuntime(
+            JobSpecification jobSpec, FeedId feedId, List<String> locations) throws AlgebricksException {
+        FeedConnectionId feedConnectionId = new FeedConnectionId(feedId, null);
+        IFeedMessage feedMessage = new EndFeedMessage(feedConnectionId, FeedRuntimeType.INTAKE,
+                feedConnectionId.getFeedId(), true, EndFeedMessage.EndMessageType.DISCONTINUE_SOURCE);
+        return buildSendFeedMessageRuntime(jobSpec, feedConnectionId, feedMessage, locations);
+    }
+
+    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
+            JobSpecification jobSpec, FeedConnectionId feedConenctionId, IFeedMessage feedMessage,
+            Collection<String> locations) throws AlgebricksException {
+        AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                locations.toArray(new String[] {}));
+        FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId,
+                feedMessage);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
+    }
+
+    private static JobSpecification buildSendFeedMessageJobSpec(IOperatorDescriptor operatorDescriptor,
+            AlgebricksPartitionConstraint messengerPc, JobSpecification messageJobSpec) {
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, operatorDescriptor,
+                messengerPc);
+        NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(messageJobSpec);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(messageJobSpec, nullSink, messengerPc);
+        messageJobSpec.connect(new OneToOneConnectorDescriptor(messageJobSpec), operatorDescriptor, 0, nullSink, 0);
+        messageJobSpec.addRoot(nullSink);
+        return messageJobSpec;
+    }
 
+    private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
+            JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations,
+            FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, FeedId sourceFeedId)
+            throws AlgebricksException {
+        IFeedMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType, sourceFeedId,
+                completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED);
+        return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
index 0af6a7e..8e633a9 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
@@ -19,16 +19,17 @@ import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
 import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
 import edu.uci.ics.asterix.file.ExternalIndexingOperations;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
@@ -36,14 +37,14 @@ import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 
 public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
 
-    private static State state;
+    private static ClusterState state;
     private static final Logger LOGGER = Logger.getLogger(AsterixGlobalRecoveryManager.class.getName());
     private HyracksConnection hcc;
     public static AsterixGlobalRecoveryManager INSTANCE;
@@ -63,8 +64,8 @@ public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
     @Override
     public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
         // perform global recovery if state changed to active
-        final State newState = AsterixClusterProperties.INSTANCE.getState();
-        boolean needToRecover = !newState.equals(state) && (newState == State.ACTIVE);
+        final ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
+        boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE);
         if (needToRecover) {
             Thread recoveryThread = new Thread(new Runnable() {
                 @Override
@@ -79,7 +80,7 @@ public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
                         List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
                         for (Dataverse dataverse : dataverses) {
                             if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
-                                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse);
+                                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse, CentralFeedManager.getInstance());
                                 List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
                                         dataverse.getDataverseName());
                                 for (Dataset dataset : datasets) {
@@ -206,7 +207,7 @@ public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
     }
 
     @Override
-    public void notifyStateChange(State previousState, State newState) {
+    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
         // Do nothing?
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 9fa9a76..6fcc248 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -26,8 +26,6 @@ import edu.uci.ics.asterix.api.http.servlet.APIServlet;
 import edu.uci.ics.asterix.api.http.servlet.AQLAPIServlet;
 import edu.uci.ics.asterix.api.http.servlet.ConnectorAPIServlet;
 import edu.uci.ics.asterix.api.http.servlet.DDLAPIServlet;
-import edu.uci.ics.asterix.api.http.servlet.FeedDashboardServlet;
-import edu.uci.ics.asterix.api.http.servlet.FeedDataProviderServlet;
 import edu.uci.ics.asterix.api.http.servlet.FeedServlet;
 import edu.uci.ics.asterix.api.http.servlet.QueryAPIServlet;
 import edu.uci.ics.asterix.api.http.servlet.QueryResultAPIServlet;
@@ -36,6 +34,9 @@ import edu.uci.ics.asterix.api.http.servlet.UpdateAPIServlet;
 import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
 import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
+import edu.uci.ics.asterix.common.feeds.api.ICentralFeedManager;
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
+import edu.uci.ics.asterix.feeds.FeedLifecycleListener;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
 import edu.uci.ics.asterix.metadata.bootstrap.AsterixStateProxy;
@@ -55,6 +56,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
     private Server webServer;
     private Server jsonAPIServer;
     private Server feedServer;
+    private ICentralFeedManager centralFeedManager;
 
     private static IAsterixStateProxy proxy;
     private ICCApplicationContext appCtx;
@@ -89,6 +91,8 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
         setupFeedServer(externalProperties);
         feedServer.start();
+        centralFeedManager = CentralFeedManager.getInstance(); 
+        centralFeedManager.start();
 
         waitUntilServerStart(webServer);
         waitUntilServerStart(jsonAPIServer);
@@ -171,9 +175,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
         feedServer.setHandler(context);
         context.addServlet(new ServletHolder(new FeedServlet()), "/");
-        context.addServlet(new ServletHolder(new FeedDashboardServlet()), "/feed/dashboard");
-        context.addServlet(new ServletHolder(new FeedDataProviderServlet()), "/feed/data");
-
+   
         // add paths here
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 950afe4..c8f778d 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -23,15 +23,15 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse.Status;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.event.schema.cluster.Node;
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
 import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
 import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
 import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse.Status;
 import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
 import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWorkResponse;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
@@ -141,8 +141,8 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
         for (IClusterManagementWork w : workSet) {
             switch (w.getClusterManagementWorkType()) {
                 case ADD_NODE:
-                    if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodes()) {
-                        nodesToAdd = ((AddNodeWork) w).getNumberOfNodes();
+                    if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodesRequested()) {
+                        nodesToAdd = ((AddNodeWork) w).getNumberOfNodesRequested();
                     }
                     nodeAdditionRequests.add((AddNodeWork) w);
                     break;
@@ -181,7 +181,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
         }
 
         for (AddNodeWork w : nodeAdditionRequests) {
-            int n = w.getNumberOfNodes();
+            int n = w.getNumberOfNodesRequested();
             List<String> nodesToBeAddedForWork = new ArrayList<String>();
             for (int i = 0; i < n && i < addedNodes.size(); i++) {
                 nodesToBeAddedForWork.add(addedNodes.get(i));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
index 77581c7..e05f406 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -20,9 +20,9 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.event.schema.cluster.Node;
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
 import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
 import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
 import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
@@ -50,8 +50,8 @@ public class ClusterWorkExecutor implements Runnable {
                 for (IClusterManagementWork w : workSet) {
                     switch (w.getClusterManagementWorkType()) {
                         case ADD_NODE:
-                            if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodes()) {
-                                nodesToAdd = ((AddNodeWork) w).getNumberOfNodes();
+                            if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodesRequested()) {
+                                nodesToAdd = ((AddNodeWork) w).getNumberOfNodesRequested();
                             }
                             nodeAdditionRequests.add(w);
                             break;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
index d2f3345..62dca1f 100755
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
@@ -124,9 +124,8 @@ public class ExternalLibraryBootstrap {
             List<edu.uci.ics.asterix.metadata.entities.DatasourceAdapter> adapters = MetadataManager.INSTANCE
                     .getDataverseAdapters(mdTxnCtx, dataverse);
             for (edu.uci.ics.asterix.metadata.entities.DatasourceAdapter adapter : adapters) {
-                if (adapter.getAdapterIdentifier().getAdapterName().startsWith(libraryName + "#")) {
-                    MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse, adapter.getAdapterIdentifier()
-                            .getAdapterName());
+                if (adapter.getAdapterIdentifier().getName().startsWith(libraryName + "#")) {
+                    MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse, adapter.getAdapterIdentifier().getName());
                 }
             }
 
@@ -145,11 +144,12 @@ public class ExternalLibraryBootstrap {
     private static void installLibraryIfNeeded(String dataverse, final File libraryDir,
             Map<String, List<String>> uninstalledLibs) throws Exception {
 
-        String libraryName = libraryDir.getName();
+        String libraryName = libraryDir.getName().trim();
         List<String> uninstalledLibsInDv = uninstalledLibs.get(dataverse);
         boolean wasUninstalled = uninstalledLibsInDv != null && uninstalledLibsInDv.contains(libraryName);
 
         MetadataTransactionContext mdTxnCtx = null;
+        MetadataManager.INSTANCE.acquireWriteLatch();
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             edu.uci.ics.asterix.metadata.entities.Library libraryInMetadata = MetadataManager.INSTANCE.getLibrary(
@@ -168,7 +168,7 @@ public class ExternalLibraryBootstrap {
             ExternalLibrary library = getLibrary(new File(libraryDir + File.separator + libraryDescriptors[0]));
 
             if (libraryDescriptors.length == 0) {
-                throw new Exception("No library descriptors defined");
+                throw new Exception("No library descriptor defined");
             } else if (libraryDescriptors.length > 1) {
                 throw new Exception("More than 1 library descriptors defined");
             }
@@ -186,12 +186,12 @@ public class ExternalLibraryBootstrap {
                         args.add(arg);
                     }
                     edu.uci.ics.asterix.metadata.entities.Function f = new edu.uci.ics.asterix.metadata.entities.Function(
-                            dataverse, libraryName + "#" + function.getName(), args.size(), args,
-                            function.getReturnType(), function.getDefinition(), library.getLanguage(),
-                            function.getFunctionType());
+                            dataverse, libraryName + "#" + function.getName().trim(), args.size(), args, function
+                                    .getReturnType().trim(), function.getDefinition().trim(), library.getLanguage()
+                                    .trim(), function.getFunctionType().trim());
                     MetadataManager.INSTANCE.addFunction(mdTxnCtx, f);
                     if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Installed function: " + libraryName + "#" + function.getName());
+                        LOGGER.info("Installed function: " + libraryName + "#" + function.getName().trim());
                     }
                 }
             }
@@ -202,8 +202,8 @@ public class ExternalLibraryBootstrap {
 
             if (library.getLibraryAdapters() != null) {
                 for (LibraryAdapter adapter : library.getLibraryAdapters().getLibraryAdapter()) {
-                    String adapterFactoryClass = adapter.getFactoryClass();
-                    String adapterName = libraryName + "#" + adapter.getName();
+                    String adapterFactoryClass = adapter.getFactoryClass().trim();
+                    String adapterName = libraryName + "#" + adapter.getName().trim();
                     AdapterIdentifier aid = new AdapterIdentifier(dataverse, adapterName);
                     DatasourceAdapter dsa = new DatasourceAdapter(aid, adapterFactoryClass, AdapterType.EXTERNAL);
                     MetadataManager.INSTANCE.addAdapter(mdTxnCtx, dsa);
@@ -231,6 +231,8 @@ public class ExternalLibraryBootstrap {
                 LOGGER.info("Exception in installing library " + libraryName);
             }
             MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+        } finally {
+            MetadataManager.INSTANCE.releaseWriteLatch();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedBootstrap.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedBootstrap.java
new file mode 100644
index 0000000..497281b
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedBootstrap.java
@@ -0,0 +1,50 @@
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+
+public class FeedBootstrap {
+
+    public final static String FEEDS_METADATA_DV = "feeds_metadata";
+    public final static String FAILED_TUPLE_DATASET = "failed_tuple";
+    public final static String FAILED_TUPLE_DATASET_TYPE = "FailedTupleType";
+    public final static String FAILED_TUPLE_DATASET_KEY = "id";
+
+    public static void setUpInitialArtifacts() throws Exception {
+
+        StringBuilder builder = new StringBuilder();
+        try {
+            builder.append("create dataverse " + FEEDS_METADATA_DV + ";" + "\n");
+            builder.append("use dataverse " + FEEDS_METADATA_DV + ";" + "\n");
+
+            builder.append("create type " + FAILED_TUPLE_DATASET_TYPE + " as open { ");
+
+            String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple",
+                    "message", "timestamp" };
+            IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                    BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
+
+            for (int i = 0; i < fieldNames.length; i++) {
+                if (i > 0) {
+                    builder.append(",");
+                }
+                builder.append(fieldNames[i] + ":");
+                builder.append(fieldTypes[i].getTypeName());
+            }
+            builder.append("}" + ";" + "\n");
+
+            builder.append("create dataset " + FAILED_TUPLE_DATASET + " " + "(" + FAILED_TUPLE_DATASET_TYPE + ")" + " "
+                    + "primary key " + FAILED_TUPLE_DATASET_KEY + " on  " + MetadataConstants.METADATA_NODEGROUP_NAME
+                    + ";");
+
+            CentralFeedManager.AQLExecutor.executeAQL(builder.toString());
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println("Error: " + builder.toString());
+            throw e;
+        }
+    }
+
+}