You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:43:56 UTC

[08/51] [partial] incubator-asterixdb git commit: Change folder structure for Java repackage

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
new file mode 100644
index 0000000..b45368a
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
@@ -0,0 +1,298 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo.FeedJobState;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.NodeLoadReport;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLoadManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.common.feeds.message.FeedCongestionMessage;
+import edu.uci.ics.asterix.common.feeds.message.FeedReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ScaleInReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.asterix.file.FeedOperations;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.PrepareStallMessage;
+import edu.uci.ics.asterix.metadata.feeds.TerminateDataFlowMessage;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedLoadManager implements IFeedLoadManager {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedLoadManager.class.getName());
+
+    private static final long MIN_MODIFICATION_INTERVAL = 180000; // 10 seconds
+    private final TreeSet<NodeLoadReport> nodeReports;
+    private final Map<FeedConnectionId, FeedActivity> feedActivities;
+    private final Map<String, Pair<Integer, Integer>> feedMetrics;
+
+    private FeedConnectionId lastModified;
+    private long lastModifiedTimestamp;
+
+    private static final int UNKNOWN = -1;
+
+    public FeedLoadManager() {
+        this.nodeReports = new TreeSet<NodeLoadReport>();
+        this.feedActivities = new HashMap<FeedConnectionId, FeedActivity>();
+        this.feedMetrics = new HashMap<String, Pair<Integer, Integer>>();
+    }
+
+    @Override
+    public void submitNodeLoadReport(NodeLoadReport report) {
+        nodeReports.remove(report);
+        nodeReports.add(report);
+    }
+
+    @Override
+    public void reportCongestion(FeedCongestionMessage message) throws AsterixException {
+        FeedRuntimeId runtimeId = message.getRuntimeId();
+        FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
+        if (jobState == null
+                || (jobState.equals(FeedJobState.UNDER_RECOVERY))
+                || (message.getConnectionId().equals(lastModified) && System.currentTimeMillis()
+                        - lastModifiedTimestamp < MIN_MODIFICATION_INTERVAL)) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Ignoring congestion report from " + runtimeId);
+            }
+            return;
+        } else {
+            try {
+                FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
+                int inflowRate = message.getInflowRate();
+                int outflowRate = message.getOutflowRate();
+                List<String> currentComputeLocations = new ArrayList<String>();
+                currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message
+                        .getConnectionId().getFeedId()));
+                int computeCardinality = currentComputeLocations.size();
+                int requiredCardinality = (int) Math
+                        .ceil((double) ((computeCardinality * inflowRate) / (double) outflowRate)) + 5;
+                int additionalComputeNodes = requiredCardinality - computeCardinality;
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("INCREASING COMPUTE CARDINALITY from " + computeCardinality + " by "
+                            + additionalComputeNodes);
+                }
+
+                List<String> helperComputeNodes = getNodeForSubstitution(additionalComputeNodes);
+
+                // Step 1) Alter the original feed job to adjust the cardinality
+                JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
+                        .getConnectionId());
+                helperComputeNodes.addAll(currentComputeLocations);
+                List<String> newLocations = new ArrayList<String>();
+                newLocations.addAll(currentComputeLocations);
+                newLocations.addAll(helperComputeNodes);
+                FeedUtil.increaseCardinality(jobSpec, FeedRuntimeType.COMPUTE, requiredCardinality, newLocations);
+
+                // Step 2) send prepare to  stall message
+                gracefullyTerminateDataFlow(message.getConnectionId(), Integer.MAX_VALUE);
+
+                // Step 3) run the altered job specification 
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("New Job after adjusting to the workload " + jobSpec);
+                }
+
+                Thread.sleep(10000);
+                runJob(jobSpec, false);
+                lastModified = message.getConnectionId();
+                lastModifiedTimestamp = System.currentTimeMillis();
+
+            } catch (Exception e) {
+                e.printStackTrace();
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe("Unable to form the required job for scaling in/out" + e.getMessage());
+                }
+                throw new AsterixException(e);
+            }
+        }
+    }
+
+    @Override
+    public void submitScaleInPossibleReport(ScaleInReportMessage message) throws Exception {
+        FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
+        if (jobState == null || (jobState.equals(FeedJobState.UNDER_RECOVERY))) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("JobState information for job " + "[" + message.getConnectionId() + "]" + " not found ");
+            }
+            return;
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Processing scale-in message " + message);
+            }
+            FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
+            JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
+                    .getConnectionId());
+            int reducedCardinality = message.getReducedCardinaliy();
+            List<String> currentComputeLocations = new ArrayList<String>();
+            currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message.getConnectionId()
+                    .getFeedId()));
+            FeedUtil.decreaseComputeCardinality(jobSpec, FeedRuntimeType.COMPUTE, reducedCardinality,
+                    currentComputeLocations);
+
+            gracefullyTerminateDataFlow(message.getConnectionId(), reducedCardinality - 1);
+            Thread.sleep(3000);
+            JobId newJobId = runJob(jobSpec, false);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Launch modified job" + "[" + newJobId + "]" + "for scale-in \n" + jobSpec);
+            }
+
+        }
+    }
+
+    private void gracefullyTerminateDataFlow(FeedConnectionId connectionId, int computePartitionRetainLimit)
+            throws Exception {
+        // Step 1) send prepare to  stall message
+        PrepareStallMessage stallMessage = new PrepareStallMessage(connectionId, computePartitionRetainLimit);
+        List<String> intakeLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+        List<String> computeLocations = FeedLifecycleListener.INSTANCE.getComputeLocations(connectionId.getFeedId());
+        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+
+        Set<String> operatorLocations = new HashSet<String>();
+
+        operatorLocations.addAll(intakeLocations);
+        operatorLocations.addAll(computeLocations);
+        operatorLocations.addAll(storageLocations);
+
+        JobSpecification messageJobSpec = FeedOperations.buildPrepareStallMessageJob(stallMessage, operatorLocations);
+        runJob(messageJobSpec, true);
+
+        // Step 2)
+        TerminateDataFlowMessage terminateMesg = new TerminateDataFlowMessage(connectionId);
+        messageJobSpec = FeedOperations.buildTerminateFlowMessageJob(terminateMesg, intakeLocations);
+        runJob(messageJobSpec, true);
+    }
+
+    public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
+        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        JobId jobId = hcc.startJob(spec);
+        if (waitForCompletion) {
+            hcc.waitForCompletion(jobId);
+        }
+        return jobId;
+    }
+
+    @Override
+    public void submitFeedRuntimeReport(FeedReportMessage report) {
+        String key = "" + report.getConnectionId() + ":" + report.getRuntimeId().getFeedRuntimeType();
+        Pair<Integer, Integer> value = feedMetrics.get(key);
+        if (value == null) {
+            value = new Pair<Integer, Integer>(report.getValue(), 1);
+            feedMetrics.put(key, value);
+        } else {
+            value.first = value.first + report.getValue();
+            value.second = value.second + 1;
+        }
+    }
+
+    @Override
+    public int getOutflowRate(FeedConnectionId connectionId, FeedRuntimeType runtimeType) {
+        int rVal;
+        String key = "" + connectionId + ":" + runtimeType;
+        feedMetrics.get(key);
+        Pair<Integer, Integer> value = feedMetrics.get(key);
+        if (value == null) {
+            rVal = UNKNOWN;
+        } else {
+            rVal = value.first / value.second;
+        }
+        return rVal;
+    }
+
+    private List<String> getNodeForSubstitution(int nRequired) {
+        List<String> nodeIds = new ArrayList<String>();
+        Iterator<NodeLoadReport> it = null;
+        int nAdded = 0;
+        while (nAdded < nRequired) {
+            it = nodeReports.iterator();
+            while (it.hasNext()) {
+                nodeIds.add(it.next().getNodeId());
+                nAdded++;
+            }
+        }
+        return nodeIds;
+    }
+
+    @Override
+    public synchronized List<String> getNodes(int required) {
+        Iterator<NodeLoadReport> it;
+        List<String> allocated = new ArrayList<String>();
+        while (allocated.size() < required) {
+            it = nodeReports.iterator();
+            while (it.hasNext() && allocated.size() < required) {
+                allocated.add(it.next().getNodeId());
+            }
+        }
+        return allocated;
+    }
+
+    @Override
+    public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage mesg) throws AsterixException, Exception {
+        System.out.println("Throttling Enabled for " + mesg.getConnectionId() + " " + mesg.getFeedRuntimeId());
+        FeedConnectionId connectionId = mesg.getConnectionId();
+        List<String> destinationLocations = new ArrayList<String>();
+        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+
+        destinationLocations.addAll(storageLocations);
+        destinationLocations.addAll(collectLocations);
+        JobSpecification messageJobSpec = FeedOperations.buildNotifyThrottlingEnabledMessageJob(mesg,
+                destinationLocations);
+        runJob(messageJobSpec, true);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.warning("Acking disabled for " + mesg.getConnectionId() + " in view of activated throttling");
+        }
+        IFeedTrackingManager trackingManager = CentralFeedManager.getInstance().getFeedTrackingManager();
+        trackingManager.disableAcking(connectionId);
+    }
+
+    @Override
+    public void reportFeedActivity(FeedConnectionId connectionId, FeedActivity activity) {
+        feedActivities.put(connectionId, activity);
+    }
+
+    @Override
+    public FeedActivity getFeedActivity(FeedConnectionId connectionId) {
+        return feedActivities.get(connectionId);
+    }
+
+    @Override
+    public Collection<FeedActivity> getFeedActivities() {
+        return feedActivities.values();
+    }
+
+    @Override
+    public void removeFeedActivity(FeedConnectionId connectionId) {
+        feedActivities.remove(connectionId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java
new file mode 100644
index 0000000..e3020aa
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedMemoryManager;
+import edu.uci.ics.asterix.common.feeds.FeedMessageService;
+import edu.uci.ics.asterix.common.feeds.FeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.NodeLoadReportService;
+import edu.uci.ics.asterix.common.feeds.api.IFeedConnectionManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMemoryManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessageService;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetadataManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetricCollector;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+import edu.uci.ics.asterix.metadata.feeds.FeedConnectionManager;
+import edu.uci.ics.asterix.metadata.feeds.FeedSubscriptionManager;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An implementation of the IFeedManager interface.
+ * Provider necessary central repository for registering/retrieving
+ * artifacts/services associated with a feed.
+ */
+public class FeedManager implements IFeedManager {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
+
+    private final IFeedSubscriptionManager feedSubscriptionManager;
+
+    private final IFeedConnectionManager feedConnectionManager;
+
+    private final IFeedMemoryManager feedMemoryManager;
+
+    private final IFeedMetricCollector feedMetricCollector;
+
+    private final IFeedMetadataManager feedMetadataManager;
+
+    private final IFeedMessageService feedMessageService;
+
+    private final NodeLoadReportService nodeLoadReportService;
+
+    private final AsterixFeedProperties asterixFeedProperties;
+
+    private final String nodeId;
+
+    private final int frameSize;
+
+    public FeedManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize) throws AsterixException, HyracksDataException {
+        this.nodeId = nodeId;
+        this.feedSubscriptionManager = new FeedSubscriptionManager(nodeId);
+        this.feedConnectionManager = new FeedConnectionManager(nodeId);
+        this.feedMetadataManager = new FeedMetadataManager(nodeId);
+        this.feedMemoryManager = new FeedMemoryManager(nodeId, feedProperties, frameSize);
+        String ccClusterIp = AsterixClusterProperties.INSTANCE.getCluster() != null ? AsterixClusterProperties.INSTANCE
+                .getCluster().getMasterNode().getClusterIp() : "localhost";
+        this.feedMessageService = new FeedMessageService(feedProperties, nodeId, ccClusterIp);
+        this.nodeLoadReportService = new NodeLoadReportService(nodeId, this);
+        try {
+            this.feedMessageService.start();
+            this.nodeLoadReportService.start();
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to start feed services " + e.getMessage());
+            }
+            e.printStackTrace();
+        }
+        this.feedMetricCollector = new FeedMetricCollector(nodeId);
+        this.frameSize = frameSize;
+        this.asterixFeedProperties = feedProperties;
+    }
+
+    @Override
+    public IFeedSubscriptionManager getFeedSubscriptionManager() {
+        return feedSubscriptionManager;
+    }
+
+    @Override
+    public IFeedConnectionManager getFeedConnectionManager() {
+        return feedConnectionManager;
+    }
+
+    @Override
+    public IFeedMemoryManager getFeedMemoryManager() {
+        return feedMemoryManager;
+    }
+
+    @Override
+    public IFeedMetricCollector getFeedMetricCollector() {
+        return feedMetricCollector;
+    }
+
+    public int getFrameSize() {
+        return frameSize;
+    }
+
+    @Override
+    public IFeedMetadataManager getFeedMetadataManager() {
+        return feedMetadataManager;
+    }
+
+    @Override
+    public IFeedMessageService getFeedMessageService() {
+        return feedMessageService;
+    }
+
+    @Override
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        return "FeedManager " + "[" + nodeId + "]";
+    }
+
+    @Override
+    public AsterixFeedProperties getAsterixFeedProperties() {
+        return asterixFeedProperties;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java
new file mode 100644
index 0000000..8530370
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.logging.Level;
+
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitAckMessage;
+import edu.uci.ics.asterix.common.feeds.MessageReceiver;
+import edu.uci.ics.asterix.common.feeds.NodeLoadReport;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLoadManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMessage.MessageType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.common.feeds.message.FeedCongestionMessage;
+import edu.uci.ics.asterix.common.feeds.message.FeedReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.ScaleInReportMessage;
+import edu.uci.ics.asterix.common.feeds.message.StorageReportFeedMessage;
+import edu.uci.ics.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import edu.uci.ics.asterix.feeds.CentralFeedManager.AQLExecutor;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedBootstrap;
+
+public class FeedMessageReceiver extends MessageReceiver<String> {
+
+    private static boolean initialized;
+
+    private final IFeedLoadManager feedLoadManager;
+    private final IFeedTrackingManager feedTrackingManager;
+
+    public FeedMessageReceiver(CentralFeedManager centralFeedManager) {
+        this.feedLoadManager = centralFeedManager.getFeedLoadManager();
+        this.feedTrackingManager = centralFeedManager.getFeedTrackingManager();
+    }
+
+    @Override
+    public void processMessage(String message) throws Exception {
+        JSONObject obj = new JSONObject(message);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Received message " + obj);
+        }
+        MessageType messageType = MessageType.valueOf(obj.getString(FeedConstants.MessageConstants.MESSAGE_TYPE));
+        switch (messageType) {
+            case XAQL:
+                if (!initialized) {
+                    FeedBootstrap.setUpInitialArtifacts();
+                    initialized = true;
+                }
+                AQLExecutor.executeAQL(obj.getString(FeedConstants.MessageConstants.AQL));
+                break;
+            case CONGESTION:
+                feedLoadManager.reportCongestion(FeedCongestionMessage.read(obj));
+                break;
+            case FEED_REPORT:
+                feedLoadManager.submitFeedRuntimeReport(FeedReportMessage.read(obj));
+                break;
+            case NODE_REPORT:
+                feedLoadManager.submitNodeLoadReport(NodeLoadReport.read(obj));
+                break;
+            case SCALE_IN_REQUEST:
+                feedLoadManager.submitScaleInPossibleReport(ScaleInReportMessage.read(obj));
+                break;
+            case STORAGE_REPORT:
+                FeedLifecycleListener.INSTANCE.updateTrackingInformation(StorageReportFeedMessage.read(obj));
+                break;
+            case COMMIT_ACK:
+                feedTrackingManager.submitAckReport(FeedTupleCommitAckMessage.read(obj));
+                break;
+            case THROTTLING_ENABLED:
+                feedLoadManager.reportThrottlingEnabled(ThrottlingEnabledFeedMessage.read(obj));
+            default:
+                break;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
new file mode 100644
index 0000000..b927c90
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.Date;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedMetadataManager;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedBootstrap;
+import edu.uci.ics.asterix.metadata.feeds.XAQLFeedMessage;
+import edu.uci.ics.asterix.om.base.ARecord;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedMetadataManager implements IFeedMetadataManager {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedMetadataManager.class.getName());
+
+    private final String nodeId;
+    private ARecordType recordType;
+
+    public FeedMetadataManager(String nodeId) throws AsterixException, HyracksDataException {
+        this.nodeId = nodeId;
+        String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple", "message",
+                "timestamp" };
+        IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
+
+        recordType = new ARecordType(FeedBootstrap.FAILED_TUPLE_DATASET_TYPE, fieldNames, fieldTypes, true);
+    }
+
+    @Override
+    public void logTuple(FeedConnectionId connectionId, String tuple, String message, IFeedManager feedManager)
+            throws AsterixException {
+        try {
+            AString id = new AString("1");
+            AString dataverseValue = new AString(connectionId.getFeedId().getDataverse());
+            AString feedValue = new AString(connectionId.getFeedId().getFeedName());
+            AString targetDatasetValue = new AString(connectionId.getDatasetName());
+            AString tupleValue = new AString(tuple);
+            AString messageValue = new AString(message);
+            AString dateTime = new AString(new Date().toString());
+
+            IAObject[] fields = new IAObject[] { id, dataverseValue, feedValue, targetDatasetValue, tupleValue,
+                    messageValue, dateTime };
+            ARecord record = new ARecord(recordType, fields);
+            StringBuilder builder = new StringBuilder();
+            builder.append("use dataverse " + FeedBootstrap.FEEDS_METADATA_DV + ";" + "\n");
+            builder.append("insert into dataset " + FeedBootstrap.FAILED_TUPLE_DATASET + " ");
+            builder.append(" (" + recordToString(record) + ")");
+            builder.append(";");
+
+            XAQLFeedMessage xAqlMessage = new XAQLFeedMessage(connectionId, builder.toString());
+            feedManager.getFeedMessageService().sendMessage(xAqlMessage);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(" Sent " + xAqlMessage.toJSON());
+            }
+        } catch (Exception pe) {
+            throw new AsterixException(pe);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "FeedMetadataManager [" + nodeId + "]";
+    }
+
+    private String recordToString(ARecord record) {
+        String[] fieldNames = record.getType().getFieldNames();
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ ");
+        for (int i = 0; i < fieldNames.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append("\"" + fieldNames[i] + "\"");
+            sb.append(": ");
+            switch (record.getType().getFieldTypes()[i].getTypeTag()) {
+                case STRING:
+                    sb.append("\"" + ((AString) record.getValueByPos(i)).getStringValue() + "\"");
+                    break;
+            }
+        }
+        sb.append(" }");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java
new file mode 100644
index 0000000..3e5a1a4
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitAckMessage;
+import edu.uci.ics.asterix.common.feeds.FeedTupleCommitResponseMessage;
+import edu.uci.ics.asterix.common.feeds.api.IFeedTrackingManager;
+import edu.uci.ics.asterix.file.FeedOperations;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedTrackingManager implements IFeedTrackingManager {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedTrackingManager.class.getName());
+
+    private final BitSet allOnes;
+
+    private Map<FeedConnectionId, Map<AckId, BitSet>> ackHistory;
+    private Map<FeedConnectionId, Map<AckId, Integer>> maxBaseAcked;
+
+    public FeedTrackingManager() {
+        byte[] allOneBytes = new byte[128];
+        Arrays.fill(allOneBytes, (byte) 0xff);
+        allOnes = BitSet.valueOf(allOneBytes);
+        ackHistory = new HashMap<FeedConnectionId, Map<AckId, BitSet>>();
+        maxBaseAcked = new HashMap<FeedConnectionId, Map<AckId, Integer>>();
+    }
+
+    @Override
+    public synchronized void submitAckReport(FeedTupleCommitAckMessage ackMessage) {
+        AckId ackId = getAckId(ackMessage);
+        Map<AckId, BitSet> acksForConnection = ackHistory.get(ackMessage.getConnectionId());
+        if (acksForConnection == null) {
+            acksForConnection = new HashMap<AckId, BitSet>();
+            acksForConnection.put(ackId, BitSet.valueOf(ackMessage.getCommitAcks()));
+            ackHistory.put(ackMessage.getConnectionId(), acksForConnection);
+        }
+        BitSet currentAcks = acksForConnection.get(ackId);
+        if (currentAcks == null) {
+            currentAcks = BitSet.valueOf(ackMessage.getCommitAcks());
+            acksForConnection.put(ackId, currentAcks);
+        } else {
+            currentAcks.or(BitSet.valueOf(ackMessage.getCommitAcks()));
+        }
+        if (Arrays.equals(currentAcks.toByteArray(), allOnes.toByteArray())) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(ackMessage.getIntakePartition() + " (" + ackMessage.getBase() + ")" + " is convered");
+            }
+            Map<AckId, Integer> maxBaseAckedForConnection = maxBaseAcked.get(ackMessage.getConnectionId());
+            if (maxBaseAckedForConnection == null) {
+                maxBaseAckedForConnection = new HashMap<AckId, Integer>();
+                maxBaseAcked.put(ackMessage.getConnectionId(), maxBaseAckedForConnection);
+            }
+            Integer maxBaseAckedValue = maxBaseAckedForConnection.get(ackId);
+            if (maxBaseAckedValue == null) {
+                maxBaseAckedValue = ackMessage.getBase();
+                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
+                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
+                        ackMessage.getBase());
+            } else if (ackMessage.getBase() == maxBaseAckedValue + 1) {
+                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
+                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
+                        ackMessage.getBase());
+            } else {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Ignoring discountiuous acked base " + ackMessage.getBase() + " for " + ackId);
+                }
+            }
+
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("AckId " + ackId + " pending number of acks " + (128 * 8 - currentAcks.cardinality()));
+            }
+        }
+    }
+
+    public synchronized void disableTracking(FeedConnectionId connectionId) {
+        ackHistory.remove(connectionId);
+        maxBaseAcked.remove(connectionId);
+    }
+
+    private void sendCommitResponseMessage(FeedConnectionId connectionId, int partition, int base) {
+        FeedTupleCommitResponseMessage response = new FeedTupleCommitResponseMessage(connectionId, partition, base);
+        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
+        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
+        String collectLocation = collectLocations.get(partition);
+        Set<String> messageDestinations = new HashSet<String>();
+        messageDestinations.add(collectLocation);
+        messageDestinations.addAll(storageLocations);
+        try {
+            JobSpecification spec = FeedOperations.buildCommitAckResponseJob(response, messageDestinations);
+            CentralFeedManager.runJob(spec, false);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to send commit response message " + response + " exception " + e.getMessage());
+            }
+        }
+    }
+
+    private static AckId getAckId(FeedTupleCommitAckMessage ackMessage) {
+        return new AckId(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), ackMessage.getBase());
+    }
+
+    private static class AckId {
+        private FeedConnectionId connectionId;
+        private int intakePartition;
+        private int base;
+
+        public AckId(FeedConnectionId connectionId, int intakePartition, int base) {
+            this.connectionId = connectionId;
+            this.intakePartition = intakePartition;
+            this.base = base;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof AckId)) {
+                return false;
+            }
+            AckId other = (AckId) o;
+            return other.getConnectionId().equals(connectionId) && other.getIntakePartition() == intakePartition
+                    && other.getBase() == base;
+        }
+
+        @Override
+        public String toString() {
+            return connectionId + "[" + intakePartition + "]" + "(" + base + ")";
+        }
+
+        @Override
+        public int hashCode() {
+            return toString().hashCode();
+        }
+
+        public FeedConnectionId getConnectionId() {
+            return connectionId;
+        }
+
+        public int getIntakePartition() {
+            return intakePartition;
+        }
+
+        public int getBase() {
+            return base;
+        }
+
+    }
+
+    @Override
+    public void disableAcking(FeedConnectionId connectionId) {
+        ackHistory.remove(connectionId);
+        maxBaseAcked.remove(connectionId);
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("Acking disabled for " + connectionId);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
new file mode 100644
index 0000000..ba54406
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.common.feeds.FeedConnectJobInfo;
+import edu.uci.ics.asterix.common.feeds.FeedIntakeInfo;
+import edu.uci.ics.asterix.common.feeds.FeedJobInfo;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedWorkRequestResponseHandler implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedWorkRequestResponseHandler.class.getName());
+
+    private final LinkedBlockingQueue<IClusterManagementWorkResponse> inbox;
+
+    private Map<Integer, Map<String, List<FeedJobInfo>>> feedsWaitingForResponse = new HashMap<Integer, Map<String, List<FeedJobInfo>>>();
+
+    public FeedWorkRequestResponseHandler(LinkedBlockingQueue<IClusterManagementWorkResponse> inbox) {
+        this.inbox = inbox;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            IClusterManagementWorkResponse response = null;
+            try {
+                response = inbox.take();
+            } catch (InterruptedException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Interrupted exception " + e.getMessage());
+                }
+            }
+            IClusterManagementWork submittedWork = response.getWork();
+            Map<String, String> nodeSubstitution = new HashMap<String, String>();
+            switch (submittedWork.getClusterManagementWorkType()) {
+                case ADD_NODE:
+                    AddNodeWork addNodeWork = (AddNodeWork) submittedWork;
+                    int workId = addNodeWork.getWorkId();
+                    Map<String, List<FeedJobInfo>> failureAnalysis = feedsWaitingForResponse.get(workId);
+                    AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
+                    List<String> nodesAdded = resp.getNodesAdded();
+                    List<String> unsubstitutedNodes = new ArrayList<String>();
+                    unsubstitutedNodes.addAll(addNodeWork.getDeadNodes());
+                    int nodeIndex = 0;
+
+                    /** form a mapping between the failed node and its substitute **/
+                    if (nodesAdded != null && nodesAdded.size() > 0) {
+                        for (String failedNodeId : addNodeWork.getDeadNodes()) {
+                            String substitute = nodesAdded.get(nodeIndex);
+                            nodeSubstitution.put(failedNodeId, substitute);
+                            nodeIndex = (nodeIndex + 1) % nodesAdded.size();
+                            unsubstitutedNodes.remove(failedNodeId);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Node " + substitute + " chosen to substiute lost node " + failedNodeId);
+                            }
+                        }
+                    }
+                    if (unsubstitutedNodes.size() > 0) {
+                        String[] participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(
+                                new String[] {});
+                        nodeIndex = 0;
+                        for (String unsubstitutedNode : unsubstitutedNodes) {
+                            nodeSubstitution.put(unsubstitutedNode, participantNodes[nodeIndex]);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Node " + participantNodes[nodeIndex] + " chosen to substiute lost node "
+                                        + unsubstitutedNode);
+                            }
+                            nodeIndex = (nodeIndex + 1) % participantNodes.length;
+                        }
+
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Request " + resp.getWork() + " completed using internal nodes");
+                        }
+                    }
+
+                    // alter failed feed intake jobs
+
+                    for (Entry<String, List<FeedJobInfo>> entry : failureAnalysis.entrySet()) {
+                        String failedNode = entry.getKey();
+                        List<FeedJobInfo> impactedJobInfos = entry.getValue();
+                        for (FeedJobInfo info : impactedJobInfos) {
+                            JobSpecification spec = info.getSpec();
+                            replaceNode(spec, failedNode, nodeSubstitution.get(failedNode));
+                            info.setSpec(spec);
+                        }
+                    }
+
+                    Set<FeedIntakeInfo> revisedIntakeJobs = new HashSet<FeedIntakeInfo>();
+                    Set<FeedConnectJobInfo> revisedConnectJobInfos = new HashSet<FeedConnectJobInfo>();
+
+                    for (List<FeedJobInfo> infos : failureAnalysis.values()) {
+                        for (FeedJobInfo info : infos) {
+                            switch (info.getJobType()) {
+                                case INTAKE:
+                                    revisedIntakeJobs.add((FeedIntakeInfo) info);
+                                    break;
+                                case FEED_CONNECT:
+                                    revisedConnectJobInfos.add((FeedConnectJobInfo) info);
+                                    break;
+                            }
+                        }
+                    }
+
+                    IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+                    try {
+                        for (FeedIntakeInfo info : revisedIntakeJobs) {
+                            hcc.startJob(info.getSpec());
+                        }
+                        Thread.sleep(2000);
+                        for (FeedConnectJobInfo info : revisedConnectJobInfos) {
+                            hcc.startJob(info.getSpec());
+                            Thread.sleep(2000);
+                        }
+                    } catch (Exception e) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to start revised job post failure");
+                        }
+                    }
+
+                    break;
+                case REMOVE_NODE:
+                    throw new IllegalStateException("Invalid work submitted");
+            }
+        }
+    }
+
+    private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
+        Set<Constraint> userConstraints = jobSpec.getUserConstraints();
+        List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
+        List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
+        List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
+        Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
+        Map<OperatorDescriptorId, Map<Integer, String>> newConstraints = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
+        OperatorDescriptorId opId = null;
+        for (Constraint constraint : userConstraints) {
+            LValueConstraintExpression lexpr = constraint.getLValue();
+            ConstraintExpression cexpr = constraint.getRValue();
+            switch (lexpr.getTag()) {
+                case PARTITION_COUNT:
+                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+                    if (modifiedOperators.contains(opId)) {
+                        countConstraintsToReplace.add(constraint);
+                    } else {
+                        List<Constraint> clist = candidateConstraints.get(opId);
+                        if (clist == null) {
+                            clist = new ArrayList<Constraint>();
+                            candidateConstraints.put(opId, clist);
+                        }
+                        clist.add(constraint);
+                    }
+                    break;
+                case PARTITION_LOCATION:
+                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+                    String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+                    if (oldLocation.equals(failedNodeId)) {
+                        locationConstraintsToReplace.add(constraint);
+                        modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
+                        Map<Integer, String> newLocs = newConstraints.get(opId);
+                        if (newLocs == null) {
+                            newLocs = new HashMap<Integer, String>();
+                            newConstraints.put(opId, newLocs);
+                        }
+                        int partition = ((PartitionLocationExpression) lexpr).getPartition();
+                        newLocs.put(partition, replacementNode);
+                    } else {
+                        if (modifiedOperators.contains(opId)) {
+                            locationConstraintsToReplace.add(constraint);
+                            Map<Integer, String> newLocs = newConstraints.get(opId);
+                            if (newLocs == null) {
+                                newLocs = new HashMap<Integer, String>();
+                                newConstraints.put(opId, newLocs);
+                            }
+                            int partition = ((PartitionLocationExpression) lexpr).getPartition();
+                            newLocs.put(partition, oldLocation);
+                        } else {
+                            List<Constraint> clist = candidateConstraints.get(opId);
+                            if (clist == null) {
+                                clist = new ArrayList<Constraint>();
+                                candidateConstraints.put(opId, clist);
+                            }
+                            clist.add(constraint);
+                        }
+                    }
+                    break;
+            }
+        }
+
+        jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
+        jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
+
+        for (OperatorDescriptorId mopId : modifiedOperators) {
+            List<Constraint> clist = candidateConstraints.get(mopId);
+            if (clist != null && !clist.isEmpty()) {
+                jobSpec.getUserConstraints().removeAll(clist);
+
+                for (Constraint c : clist) {
+                    if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
+                        ConstraintExpression cexpr = c.getRValue();
+                        int partition = ((PartitionLocationExpression) c.getLValue()).getPartition();
+                        String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+                        newConstraints.get(mopId).put(partition, oldLocation);
+                    }
+                }
+            }
+        }
+
+        for (Entry<OperatorDescriptorId, Map<Integer, String>> entry : newConstraints.entrySet()) {
+            OperatorDescriptorId nopId = entry.getKey();
+            Map<Integer, String> clist = entry.getValue();
+            IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
+            String[] locations = new String[clist.size()];
+            for (int i = 0; i < locations.length; i++) {
+                locations[i] = clist.get(i);
+            }
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations);
+        }
+
+    }
+
+    public void registerFeedWork(int workId, Map<String, List<FeedJobInfo>> impactedJobs) {
+        feedsWaitingForResponse.put(workId, impactedJobs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
new file mode 100644
index 0000000..0e7ec5b
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feeds;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
+import edu.uci.ics.asterix.aql.expression.DataverseDecl;
+import edu.uci.ics.asterix.aql.expression.Identifier;
+import edu.uci.ics.asterix.aql.translator.AqlTranslator;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class FeedsActivator implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
+
+    private List<FeedCollectInfo> feedsToRevive;
+    private Mode mode;
+
+    public enum Mode {
+        REVIVAL_POST_CLUSTER_REBOOT,
+        REVIVAL_POST_NODE_REJOIN
+    }
+
+    public FeedsActivator() {
+        this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT;
+    }
+
+    public FeedsActivator(List<FeedCollectInfo> feedsToRevive) {
+        this.feedsToRevive = feedsToRevive;
+        this.mode = Mode.REVIVAL_POST_NODE_REJOIN;
+    }
+
+    @Override
+    public void run() {
+        switch (mode) {
+            case REVIVAL_POST_CLUSTER_REBOOT:
+                //revivePostClusterReboot();
+                break;
+            case REVIVAL_POST_NODE_REJOIN:
+                try {
+                    Thread.sleep(10000);
+                } catch (InterruptedException e1) {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Attempt to resume feed interrupted");
+                    }
+                    throw new IllegalStateException(e1.getMessage());
+                }
+                for (FeedCollectInfo finfo : feedsToRevive) {
+                    try {
+                        JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
+                            LOGGER.info("Job:" + finfo.jobSpec);
+                        }
+                    } catch (Exception e) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
+                        }
+                    }
+                }
+        }
+    }
+
+    public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
+        PrintWriter writer = new PrintWriter(System.out, true);
+        SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
+        try {
+            DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
+            ConnectFeedStatement stmt = new ConnectFeedStatement(new Identifier(dataverse), new Identifier(feedName),
+                    new Identifier(dataset), feedPolicy, 0);
+            stmt.setForceConnect(true);
+            List<Statement> statements = new ArrayList<Statement>();
+            statements.add(dataverseDecl);
+            statements.add(stmt);
+            AqlTranslator translator = new AqlTranslator(statements, pc);
+            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, AqlTranslator.ResultDelivery.SYNC);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy "
+                        + feedPolicy + " Exception " + e.getMessage());
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
new file mode 100644
index 0000000..d476eed
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.file;
+
+import java.io.File;
+import java.rmi.RemoteException;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.formats.base.IDataFormat;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
+import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import edu.uci.ics.hyracks.storage.common.file.LocalResource;
+
+public class DatasetOperations {
+
+    private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
+
+    public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
+            AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
+            ACIDException, AsterixException {
+
+        String dataverseName = null;
+        if (datasetDropStmt.getDataverseName() != null) {
+            dataverseName = datasetDropStmt.getDataverseName();
+        } else if (metadataProvider.getDefaultDataverse() != null) {
+            dataverseName = metadataProvider.getDefaultDataverse().getDataverseName();
+        }
+
+        String datasetName = datasetDropStmt.getDatasetName();
+        String datasetPath = dataverseName + File.separator + datasetName;
+
+        LOGGER.info("DROP DATASETPATH: " + datasetPath);
+
+        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
+        }
+        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+            return JobSpecificationUtils.createJobSpecification();
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+
+        Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+                dataverseName);
+        IDataFormat format;
+        try {
+            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+
+        ARecordType itemType = (ARecordType) metadataProvider.findType(dataverseName, dataset.getItemTypeName());
+
+        ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+        IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                itemType, format.getBinaryComparatorFactoryProvider());
+        int[] filterFields = DatasetUtils.createFilterFields(dataset);
+        int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+        JobSpecification specPrimary = JobSpecificationUtils.createJobSpecification();
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), datasetName, datasetName,
+                        temp);
+        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+                metadataProvider.getMetadataTxnContext());
+
+        // The index drop operation should be persistent regardless of temp datasets or permanent dataset
+        IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+                        dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
+                        new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
+                        filterCmpFactories, btreeFields, filterFields, true));
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
+                splitsAndConstraint.second);
+
+        specPrimary.addRoot(primaryBtreeDrop);
+
+        return specPrimary;
+    }
+
+    public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
+            AqlMetadataProvider metadata) throws AsterixException, AlgebricksException {
+        String dataverseName = dataverse.getDataverseName();
+        IDataFormat format;
+        try {
+            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+        Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ARecordType itemType = (ARecordType) metadata.findType(dataverseName, dataset.getItemTypeName());
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+                itemType, format.getBinaryComparatorFactoryProvider());
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
+        int[] bloomFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
+
+        ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+        IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                itemType, format.getBinaryComparatorFactoryProvider());
+        int[] filterFields = DatasetUtils.createFilterFields(dataset);
+        int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
+        FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < fs.length; i++) {
+            sb.append(stringOf(fs[i]) + " ");
+        }
+        LOGGER.info("CREATING File Splits: " + sb.toString());
+
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+                metadata.getMetadataTxnContext());
+        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
+        ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
+                comparatorFactories, bloomFilterKeyFields, true, dataset.getDatasetId(), compactionInfo.first,
+                compactionInfo.second, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
+        ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
+                localResourceMetadata, LocalResource.LSMBTreeResource);
+
+        // The index create operation should be persistent regardless of temp datasets or permanent dataset
+        TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                        compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(dataset
+                                .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
+                                .getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+                        btreeFields, filterFields, true), localResourceFactoryProvider,
+                NoOpOperationCallbackFactory.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
+                splitsAndConstraint.second);
+        spec.addRoot(indexCreateOp);
+        return spec;
+    }
+
+    private static String stringOf(FileSplit fs) {
+        return fs.getNodeName() + ":" + fs.getLocalFile().toString();
+    }
+
+    public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName,
+            AqlMetadataProvider metadata) throws AsterixException, AlgebricksException {
+        String dataverseName = dataverse.getDataverseName();
+        IDataFormat format;
+        try {
+            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+        Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+        }
+        boolean temp = dataset.getDatasetDetails().isTemp();
+
+        ARecordType itemType = (ARecordType) metadata.findType(dataverseName, dataset.getItemTypeName());
+        JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+                itemType, format.getBinaryComparatorFactoryProvider());
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
+        int[] blooFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset);
+
+        ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+        IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                itemType, format.getBinaryComparatorFactoryProvider());
+        int[] filterFields = DatasetUtils.createFilterFields(dataset);
+        int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
+
+        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
+                metadata.getMetadataTxnContext());
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields,
+                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                        compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
+                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
+                        filterCmpFactories, btreeFields, filterFields, !temp), NoOpOperationCallbackFactory.INSTANCE);
+        AlgebricksPartitionConstraintHelper
+                .setPartitionConstraintInJobSpec(spec, compactOp, splitsAndConstraint.second);
+
+        AlgebricksPartitionConstraintHelper
+                .setPartitionConstraintInJobSpec(spec, compactOp, splitsAndConstraint.second);
+        spec.addRoot(compactOp);
+        return spec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/34d81630/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
new file mode 100644
index 0000000..997893f
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.file;
+
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class DataverseOperations {
+    public static JobSpecification createDropDataverseJobSpec(Dataverse dataverse, AqlMetadataProvider metadata) {
+        JobSpecification jobSpec = JobSpecificationUtils.createJobSpecification();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForDataverse(dataverse.getDataverseName());
+        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second);
+        jobSpec.addRoot(frod);
+        return jobSpec;
+    }
+}