You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:17 UTC

[23/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java
deleted file mode 100644
index d53428d..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLifecycleListener.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.common.feeds.FeedConnectJobInfo;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedConnectionRequest;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedIntakeInfo;
-import org.apache.asterix.common.feeds.FeedJobInfo;
-import org.apache.asterix.common.feeds.FeedJobInfo.FeedJobState;
-import org.apache.asterix.common.feeds.FeedJointKey;
-import org.apache.asterix.common.feeds.api.IFeedJoint;
-import org.apache.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.common.feeds.api.IFeedLifecycleListener;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.common.feeds.message.StorageReportFeedMessage;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
-import org.apache.asterix.lang.common.statement.DisconnectFeedStatement;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.cluster.AddNodeWork;
-import org.apache.asterix.metadata.cluster.ClusterManager;
-import org.apache.asterix.metadata.feeds.FeedCollectOperatorDescriptor;
-import org.apache.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-/**
- * A listener that subscribes to events associated with cluster membership
- * (nodes joining/leaving the cluster) and job lifecycle (start/end of a job).
- * Subscription to such events allows keeping track of feed ingestion jobs and
- * take any corrective action that may be required when a node involved in a
- * feed leaves the cluster.
- */
-public class FeedLifecycleListener implements IFeedLifecycleListener {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedLifecycleListener.class.getName());
-
-    public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
-    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
-
-    private final LinkedBlockingQueue<Message> jobEventInbox;
-    private final LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
-    private final Map<FeedCollectInfo, List<String>> dependentFeeds = new HashMap<FeedCollectInfo, List<String>>();
-    private final Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue;
-    private final FeedJobNotificationHandler feedJobNotificationHandler;
-    private final FeedWorkRequestResponseHandler feedWorkRequestResponseHandler;
-    private final ExecutorService executorService;
-
-    private ClusterState state;
-
-    private FeedLifecycleListener() {
-        this.jobEventInbox = new LinkedBlockingQueue<Message>();
-        this.feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
-        this.responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
-        this.feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
-        this.feedReportQueue = new HashMap<FeedConnectionId, LinkedBlockingQueue<String>>();
-        this.executorService = Executors.newCachedThreadPool();
-        this.executorService.execute(feedJobNotificationHandler);
-        this.executorService.execute(feedWorkRequestResponseHandler);
-        ClusterManager.INSTANCE.registerSubscriber(this);
-        this.state = AsterixClusterProperties.INSTANCE.getState();
-    }
-
-    @Override
-    public void notifyJobStart(JobId jobId) throws HyracksException {
-        if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
-            jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_START));
-        }
-    }
-
-    @Override
-    public void notifyJobFinish(JobId jobId) throws HyracksException {
-        if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
-            jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_FINISH));
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
-            }
-        }
-    }
-
-    public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId connectionId) {
-        return feedJobNotificationHandler.getFeedConnectJobInfo(connectionId);
-    }
-
-    public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
-            IIntakeProgressTracker feedIntakeProgressTracker) {
-        feedJobNotificationHandler.registerFeedIntakeProgressTracker(connectionId, feedIntakeProgressTracker);
-    }
-
-    public void deregisterFeedIntakeProgressTracker(FeedConnectionId connectionId) {
-        feedJobNotificationHandler.deregisterFeedIntakeProgressTracker(connectionId);
-    }
-
-    public void updateTrackingInformation(StorageReportFeedMessage srm) {
-        feedJobNotificationHandler.updateTrackingInformation(srm);
-    }
-
-    /*
-     * Traverse job specification to categorize job as a feed intake job or a feed collection job
-     */
-    @Override
-    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
-        JobSpecification spec = acggf.getJobSpecification();
-        FeedConnectionId feedConnectionId = null;
-        Map<String, String> feedPolicy = null;
-        for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
-            if (opDesc instanceof FeedCollectOperatorDescriptor) {
-                feedConnectionId = ((FeedCollectOperatorDescriptor) opDesc).getFeedConnectionId();
-                feedPolicy = ((FeedCollectOperatorDescriptor) opDesc).getFeedPolicyProperties();
-                feedJobNotificationHandler.registerFeedCollectionJob(
-                        ((FeedCollectOperatorDescriptor) opDesc).getSourceFeedId(), feedConnectionId, jobId, spec,
-                        feedPolicy);
-                break;
-            } else if (opDesc instanceof FeedIntakeOperatorDescriptor) {
-                feedJobNotificationHandler.registerFeedIntakeJob(((FeedIntakeOperatorDescriptor) opDesc).getFeedId(),
-                        jobId, spec);
-                break;
-            }
-        }
-    }
-
-    public void setJobState(FeedConnectionId connectionId, FeedJobState jobState) {
-        feedJobNotificationHandler.setJobState(connectionId, jobState);
-    }
-
-    public FeedJobState getFeedJobState(FeedConnectionId connectionId) {
-        return feedJobNotificationHandler.getFeedJobState(connectionId);
-    }
-
-    public static class Message {
-        public JobId jobId;
-
-        public enum MessageKind {
-            JOB_START,
-            JOB_FINISH
-        }
-
-        public MessageKind messageKind;
-
-        public Message(JobId jobId, MessageKind msgKind) {
-            this.jobId = jobId;
-            this.messageKind = msgKind;
-        }
-    }
-
-    @Override
-    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
-        Set<IClusterManagementWork> workToBeDone = new HashSet<IClusterManagementWork>();
-
-        Collection<FeedIntakeInfo> intakeInfos = feedJobNotificationHandler.getFeedIntakeInfos();
-        Collection<FeedConnectJobInfo> connectJobInfos = feedJobNotificationHandler.getFeedConnectInfos();
-
-        Map<String, List<FeedJobInfo>> impactedJobs = new HashMap<String, List<FeedJobInfo>>();
-
-        for (String deadNode : deadNodeIds) {
-            for (FeedIntakeInfo intakeInfo : intakeInfos) {
-                if (intakeInfo.getIntakeLocation().contains(deadNode)) {
-                    List<FeedJobInfo> infos = impactedJobs.get(deadNode);
-                    if (infos == null) {
-                        infos = new ArrayList<FeedJobInfo>();
-                        impactedJobs.put(deadNode, infos);
-                    }
-                    infos.add(intakeInfo);
-                    intakeInfo.setState(FeedJobState.UNDER_RECOVERY);
-                }
-            }
-
-            for (FeedConnectJobInfo connectInfo : connectJobInfos) {
-                if (connectInfo.getStorageLocations().contains(deadNode)) {
-                    continue;
-                }
-                if (connectInfo.getComputeLocations().contains(deadNode)
-                        || connectInfo.getCollectLocations().contains(deadNode)) {
-                    List<FeedJobInfo> infos = impactedJobs.get(deadNode);
-                    if (infos == null) {
-                        infos = new ArrayList<FeedJobInfo>();
-                        impactedJobs.put(deadNode, infos);
-                    }
-                    infos.add(connectInfo);
-                    connectInfo.setState(FeedJobState.UNDER_RECOVERY);
-                    feedJobNotificationHandler.deregisterFeedActivity(connectInfo);
-                }
-            }
-
-        }
-
-        if (impactedJobs.size() > 0) {
-            AddNodeWork addNodeWork = new AddNodeWork(deadNodeIds, deadNodeIds.size(), this);
-            feedWorkRequestResponseHandler.registerFeedWork(addNodeWork.getWorkId(), impactedJobs);
-            workToBeDone.add(addNodeWork);
-        }
-        return workToBeDone;
-
-    }
-
-    public static class FailureReport {
-
-        private final List<Pair<FeedConnectJobInfo, List<String>>> recoverableConnectJobs;
-        private final Map<IFeedJoint, List<String>> recoverableIntakeFeedIds;
-
-        public FailureReport(Map<IFeedJoint, List<String>> recoverableIntakeFeedIds,
-                List<Pair<FeedConnectJobInfo, List<String>>> recoverableSubscribers) {
-            this.recoverableConnectJobs = recoverableSubscribers;
-            this.recoverableIntakeFeedIds = recoverableIntakeFeedIds;
-        }
-
-        public List<Pair<FeedConnectJobInfo, List<String>>> getRecoverableSubscribers() {
-            return recoverableConnectJobs;
-        }
-
-        public Map<IFeedJoint, List<String>> getRecoverableIntakeFeedIds() {
-            return recoverableIntakeFeedIds;
-        }
-
-    }
-
-    @Override
-    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
-        ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(joinedNodeId + " joined the cluster. " + "Asterix state: " + newState);
-        }
-
-        boolean needToReActivateFeeds = !newState.equals(state) && (newState == ClusterState.ACTIVE);
-        if (needToReActivateFeeds) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
-            }
-            try {
-                FeedsActivator activator = new FeedsActivator();
-                (new Thread(activator)).start();
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Exception in resuming feeds" + e.getMessage());
-                }
-            }
-            state = newState;
-        } else {
-            List<FeedCollectInfo> feedsThatCanBeRevived = new ArrayList<FeedCollectInfo>();
-            for (Entry<FeedCollectInfo, List<String>> entry : dependentFeeds.entrySet()) {
-                List<String> requiredNodeIds = entry.getValue();
-                if (requiredNodeIds.contains(joinedNodeId)) {
-                    requiredNodeIds.remove(joinedNodeId);
-                    if (requiredNodeIds.isEmpty()) {
-                        feedsThatCanBeRevived.add(entry.getKey());
-                    }
-                }
-            }
-            if (!feedsThatCanBeRevived.isEmpty()) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info(joinedNodeId + " Resuming feeds after rejoining of node " + joinedNodeId);
-                }
-                FeedsActivator activator = new FeedsActivator(feedsThatCanBeRevived);
-                (new Thread(activator)).start();
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
-        try {
-            responseInbox.put(response);
-        } catch (InterruptedException e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Interrupted exception");
-            }
-        }
-    }
-
-    @Override
-    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
-        switch (newState) {
-            case ACTIVE:
-                if (previousState.equals(ClusterState.UNUSABLE)) {
-                    try {
-                        FeedsActivator activator = new FeedsActivator();
-                        // (new Thread(activator)).start();
-                    } catch (Exception e) {
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Exception in resuming feeds" + e.getMessage());
-                        }
-                    }
-                }
-                break;
-            default:
-                break;
-        }
-
-    }
-
-    public static class FeedsDeActivator implements Runnable {
-
-        private List<FeedConnectJobInfo> failedConnectjobs;
-
-        public FeedsDeActivator(List<FeedConnectJobInfo> failedConnectjobs) {
-            this.failedConnectjobs = failedConnectjobs;
-        }
-
-        @Override
-        public void run() {
-            for (FeedConnectJobInfo failedConnectJob : failedConnectjobs) {
-                endFeed(failedConnectJob);
-            }
-        }
-
-        private void endFeed(FeedConnectJobInfo cInfo) {
-            MetadataTransactionContext ctx = null;
-            PrintWriter writer = new PrintWriter(System.out, true);
-            SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
-
-            try {
-                ctx = MetadataManager.INSTANCE.beginTransaction();
-                FeedId feedId = cInfo.getConnectionId().getFeedId();
-                DisconnectFeedStatement stmt = new DisconnectFeedStatement(new Identifier(feedId.getDataverse()),
-                        new Identifier(feedId.getFeedName()), new Identifier(cInfo.getConnectionId().getDatasetName()));
-                List<Statement> statements = new ArrayList<Statement>();
-                DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedId.getDataverse()));
-                statements.add(dataverseDecl);
-                statements.add(stmt);
-                QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
-                translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                        QueryTranslator.ResultDelivery.SYNC);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("End irrecoverable feed: " + cInfo.getConnectionId());
-                }
-                MetadataManager.INSTANCE.commitTransaction(ctx);
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Exception in ending loser feed: " + cInfo.getConnectionId() + " Exception "
-                            + e.getMessage());
-                }
-                e.printStackTrace();
-                try {
-                    MetadataManager.INSTANCE.abortTransaction(ctx);
-                } catch (Exception e2) {
-                    e2.addSuppressed(e);
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Exception in aborting transaction! System is in inconsistent state");
-                    }
-                }
-
-            }
-
-        }
-    }
-
-    public void submitFeedConnectionRequest(IFeedJoint feedPoint, FeedConnectionRequest subscriptionRequest)
-            throws Exception {
-        feedJobNotificationHandler.submitFeedConnectionRequest(feedPoint, subscriptionRequest);
-    }
-
-    @Override
-    public List<FeedConnectionId> getActiveFeedConnections(FeedId feedId) {
-        List<FeedConnectionId> connections = new ArrayList<FeedConnectionId>();
-        Collection<FeedConnectionId> activeConnections = feedJobNotificationHandler.getActiveFeedConnections();
-        if (feedId != null) {
-            for (FeedConnectionId connectionId : activeConnections) {
-                if (connectionId.getFeedId().equals(feedId)) {
-                    connections.add(connectionId);
-                }
-            }
-        } else {
-            connections.addAll(activeConnections);
-        }
-        return connections;
-    }
-
-    @Override
-    public List<String> getComputeLocations(FeedId feedId) {
-        return feedJobNotificationHandler.getFeedComputeLocations(feedId);
-    }
-
-    @Override
-    public List<String> getIntakeLocations(FeedId feedId) {
-        return feedJobNotificationHandler.getFeedIntakeLocations(feedId);
-    }
-
-    @Override
-    public List<String> getStoreLocations(FeedConnectionId feedConnectionId) {
-        return feedJobNotificationHandler.getFeedStorageLocations(feedConnectionId);
-    }
-
-    @Override
-    public List<String> getCollectLocations(FeedConnectionId feedConnectionId) {
-        return feedJobNotificationHandler.getFeedCollectLocations(feedConnectionId);
-    }
-
-    @Override
-    public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
-        return feedJobNotificationHandler.isFeedConnectionActive(connectionId);
-    }
-
-    public void reportPartialDisconnection(FeedConnectionId connectionId) {
-        feedJobNotificationHandler.removeFeedJointsPostPipelineTermination(connectionId);
-    }
-
-    public void registerFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
-        feedReportQueue.put(feedId, queue);
-    }
-
-    public void deregisterFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
-        feedReportQueue.remove(feedId);
-    }
-
-    public LinkedBlockingQueue<String> getFeedReportQueue(FeedConnectionId feedId) {
-        return feedReportQueue.get(feedId);
-    }
-
-    @Override
-    public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
-        return feedJobNotificationHandler.getAvailableFeedJoint(feedJointKey);
-    }
-
-    @Override
-    public boolean isFeedJointAvailable(FeedJointKey feedJointKey) {
-        return feedJobNotificationHandler.isFeedPointAvailable(feedJointKey);
-    }
-
-    public void registerFeedJoint(IFeedJoint feedJoint) {
-        feedJobNotificationHandler.registerFeedJoint(feedJoint);
-    }
-
-    public IFeedJoint getFeedJoint(FeedJointKey feedJointKey) {
-        return feedJobNotificationHandler.getFeedJoint(feedJointKey);
-    }
-
-    @Override
-    public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
-        feedJobNotificationHandler.registerFeedEventSubscriber(connectionId, subscriber);
-    }
-
-    @Override
-    public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
-        feedJobNotificationHandler.deregisterFeedEventSubscriber(connectionId, subscriber);
-
-    }
-
-    public JobSpecification getCollectJobSpecification(FeedConnectionId connectionId) {
-        return feedJobNotificationHandler.getCollectJobSpecification(connectionId);
-    }
-
-    public JobId getFeedCollectJobId(FeedConnectionId connectionId) {
-        return feedJobNotificationHandler.getFeedCollectJobId(connectionId);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
deleted file mode 100644
index cb3133e..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedLoadManager.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedActivity;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedJobInfo.FeedJobState;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.NodeLoadReport;
-import org.apache.asterix.common.feeds.api.IFeedLoadManager;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedTrackingManager;
-import org.apache.asterix.common.feeds.message.FeedCongestionMessage;
-import org.apache.asterix.common.feeds.message.FeedReportMessage;
-import org.apache.asterix.common.feeds.message.ScaleInReportMessage;
-import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
-import org.apache.asterix.file.FeedOperations;
-import org.apache.asterix.metadata.feeds.FeedUtil;
-import org.apache.asterix.metadata.feeds.PrepareStallMessage;
-import org.apache.asterix.metadata.feeds.TerminateDataFlowMessage;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedLoadManager implements IFeedLoadManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedLoadManager.class.getName());
-
-    private static final long MIN_MODIFICATION_INTERVAL = 180000; // 10 seconds
-    private final TreeSet<NodeLoadReport> nodeReports;
-    private final Map<FeedConnectionId, FeedActivity> feedActivities;
-    private final Map<String, Pair<Integer, Integer>> feedMetrics;
-
-    private FeedConnectionId lastModified;
-    private long lastModifiedTimestamp;
-
-    private static final int UNKNOWN = -1;
-
-    public FeedLoadManager() {
-        this.nodeReports = new TreeSet<NodeLoadReport>();
-        this.feedActivities = new HashMap<FeedConnectionId, FeedActivity>();
-        this.feedMetrics = new HashMap<String, Pair<Integer, Integer>>();
-    }
-
-    @Override
-    public void submitNodeLoadReport(NodeLoadReport report) {
-        nodeReports.remove(report);
-        nodeReports.add(report);
-    }
-
-    @Override
-    public void reportCongestion(FeedCongestionMessage message) throws AsterixException {
-        FeedRuntimeId runtimeId = message.getRuntimeId();
-        FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
-        if (jobState == null
-                || (jobState.equals(FeedJobState.UNDER_RECOVERY))
-                || (message.getConnectionId().equals(lastModified) && System.currentTimeMillis()
-                        - lastModifiedTimestamp < MIN_MODIFICATION_INTERVAL)) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Ignoring congestion report from " + runtimeId);
-            }
-            return;
-        } else {
-            try {
-                FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
-                int inflowRate = message.getInflowRate();
-                int outflowRate = message.getOutflowRate();
-                List<String> currentComputeLocations = new ArrayList<String>();
-                currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message
-                        .getConnectionId().getFeedId()));
-                int computeCardinality = currentComputeLocations.size();
-                int requiredCardinality = (int) Math
-                        .ceil((double) ((computeCardinality * inflowRate) / (double) outflowRate)) + 5;
-                int additionalComputeNodes = requiredCardinality - computeCardinality;
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("INCREASING COMPUTE CARDINALITY from " + computeCardinality + " by "
-                            + additionalComputeNodes);
-                }
-
-                List<String> helperComputeNodes = getNodeForSubstitution(additionalComputeNodes);
-
-                // Step 1) Alter the original feed job to adjust the cardinality
-                JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
-                        .getConnectionId());
-                helperComputeNodes.addAll(currentComputeLocations);
-                List<String> newLocations = new ArrayList<String>();
-                newLocations.addAll(currentComputeLocations);
-                newLocations.addAll(helperComputeNodes);
-                FeedUtil.increaseCardinality(jobSpec, FeedRuntimeType.COMPUTE, requiredCardinality, newLocations);
-
-                // Step 2) send prepare to  stall message
-                gracefullyTerminateDataFlow(message.getConnectionId(), Integer.MAX_VALUE);
-
-                // Step 3) run the altered job specification 
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("New Job after adjusting to the workload " + jobSpec);
-                }
-
-                Thread.sleep(10000);
-                runJob(jobSpec, false);
-                lastModified = message.getConnectionId();
-                lastModifiedTimestamp = System.currentTimeMillis();
-
-            } catch (Exception e) {
-                e.printStackTrace();
-                if (LOGGER.isLoggable(Level.SEVERE)) {
-                    LOGGER.severe("Unable to form the required job for scaling in/out" + e.getMessage());
-                }
-                throw new AsterixException(e);
-            }
-        }
-    }
-
-    @Override
-    public void submitScaleInPossibleReport(ScaleInReportMessage message) throws Exception {
-        FeedJobState jobState = FeedLifecycleListener.INSTANCE.getFeedJobState(message.getConnectionId());
-        if (jobState == null || (jobState.equals(FeedJobState.UNDER_RECOVERY))) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("JobState information for job " + "[" + message.getConnectionId() + "]" + " not found ");
-            }
-            return;
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Processing scale-in message " + message);
-            }
-            FeedLifecycleListener.INSTANCE.setJobState(message.getConnectionId(), FeedJobState.UNDER_RECOVERY);
-            JobSpecification jobSpec = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(message
-                    .getConnectionId());
-            int reducedCardinality = message.getReducedCardinaliy();
-            List<String> currentComputeLocations = new ArrayList<String>();
-            currentComputeLocations.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(message.getConnectionId()
-                    .getFeedId()));
-            FeedUtil.decreaseComputeCardinality(jobSpec, FeedRuntimeType.COMPUTE, reducedCardinality,
-                    currentComputeLocations);
-
-            gracefullyTerminateDataFlow(message.getConnectionId(), reducedCardinality - 1);
-            Thread.sleep(3000);
-            JobId newJobId = runJob(jobSpec, false);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Launch modified job" + "[" + newJobId + "]" + "for scale-in \n" + jobSpec);
-            }
-
-        }
-    }
-
-    private void gracefullyTerminateDataFlow(FeedConnectionId connectionId, int computePartitionRetainLimit)
-            throws Exception {
-        // Step 1) send prepare to  stall message
-        PrepareStallMessage stallMessage = new PrepareStallMessage(connectionId, computePartitionRetainLimit);
-        List<String> intakeLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
-        List<String> computeLocations = FeedLifecycleListener.INSTANCE.getComputeLocations(connectionId.getFeedId());
-        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
-
-        Set<String> operatorLocations = new HashSet<String>();
-
-        operatorLocations.addAll(intakeLocations);
-        operatorLocations.addAll(computeLocations);
-        operatorLocations.addAll(storageLocations);
-
-        JobSpecification messageJobSpec = FeedOperations.buildPrepareStallMessageJob(stallMessage, operatorLocations);
-        runJob(messageJobSpec, true);
-
-        // Step 2)
-        TerminateDataFlowMessage terminateMesg = new TerminateDataFlowMessage(connectionId);
-        messageJobSpec = FeedOperations.buildTerminateFlowMessageJob(terminateMesg, intakeLocations);
-        runJob(messageJobSpec, true);
-    }
-
-    public static JobId runJob(JobSpecification spec, boolean waitForCompletion) throws Exception {
-        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-        JobId jobId = hcc.startJob(spec);
-        if (waitForCompletion) {
-            hcc.waitForCompletion(jobId);
-        }
-        return jobId;
-    }
-
-    @Override
-    public void submitFeedRuntimeReport(FeedReportMessage report) {
-        String key = "" + report.getConnectionId() + ":" + report.getRuntimeId().getFeedRuntimeType();
-        Pair<Integer, Integer> value = feedMetrics.get(key);
-        if (value == null) {
-            value = new Pair<Integer, Integer>(report.getValue(), 1);
-            feedMetrics.put(key, value);
-        } else {
-            value.first = value.first + report.getValue();
-            value.second = value.second + 1;
-        }
-    }
-
-    @Override
-    public int getOutflowRate(FeedConnectionId connectionId, FeedRuntimeType runtimeType) {
-        int rVal;
-        String key = "" + connectionId + ":" + runtimeType;
-        feedMetrics.get(key);
-        Pair<Integer, Integer> value = feedMetrics.get(key);
-        if (value == null) {
-            rVal = UNKNOWN;
-        } else {
-            rVal = value.first / value.second;
-        }
-        return rVal;
-    }
-
-    private List<String> getNodeForSubstitution(int nRequired) {
-        List<String> nodeIds = new ArrayList<String>();
-        Iterator<NodeLoadReport> it = null;
-        int nAdded = 0;
-        while (nAdded < nRequired) {
-            it = nodeReports.iterator();
-            while (it.hasNext()) {
-                nodeIds.add(it.next().getNodeId());
-                nAdded++;
-            }
-        }
-        return nodeIds;
-    }
-
-    @Override
-    public synchronized List<String> getNodes(int required) {
-        Iterator<NodeLoadReport> it;
-        List<String> allocated = new ArrayList<String>();
-        while (allocated.size() < required) {
-            it = nodeReports.iterator();
-            while (it.hasNext() && allocated.size() < required) {
-                allocated.add(it.next().getNodeId());
-            }
-        }
-        return allocated;
-    }
-
-    @Override
-    public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage mesg) throws AsterixException, Exception {
-        System.out.println("Throttling Enabled for " + mesg.getConnectionId() + " " + mesg.getFeedRuntimeId());
-        FeedConnectionId connectionId = mesg.getConnectionId();
-        List<String> destinationLocations = new ArrayList<String>();
-        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
-        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
-
-        destinationLocations.addAll(storageLocations);
-        destinationLocations.addAll(collectLocations);
-        JobSpecification messageJobSpec = FeedOperations.buildNotifyThrottlingEnabledMessageJob(mesg,
-                destinationLocations);
-        runJob(messageJobSpec, true);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.warning("Acking disabled for " + mesg.getConnectionId() + " in view of activated throttling");
-        }
-        IFeedTrackingManager trackingManager = CentralFeedManager.getInstance().getFeedTrackingManager();
-        trackingManager.disableAcking(connectionId);
-    }
-
-    @Override
-    public void reportFeedActivity(FeedConnectionId connectionId, FeedActivity activity) {
-        feedActivities.put(connectionId, activity);
-    }
-
-    @Override
-    public FeedActivity getFeedActivity(FeedConnectionId connectionId) {
-        return feedActivities.get(connectionId);
-    }
-
-    @Override
-    public Collection<FeedActivity> getFeedActivities() {
-        return feedActivities.values();
-    }
-
-    @Override
-    public void removeFeedActivity(FeedConnectionId connectionId) {
-        feedActivities.remove(connectionId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java
deleted file mode 100644
index 7f9afb8..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedMemoryManager;
-import org.apache.asterix.common.feeds.FeedMessageService;
-import org.apache.asterix.common.feeds.FeedMetricCollector;
-import org.apache.asterix.common.feeds.NodeLoadReportService;
-import org.apache.asterix.common.feeds.api.IFeedConnectionManager;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedMemoryManager;
-import org.apache.asterix.common.feeds.api.IFeedMessageService;
-import org.apache.asterix.common.feeds.api.IFeedMetadataManager;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
-import org.apache.asterix.metadata.feeds.FeedConnectionManager;
-import org.apache.asterix.metadata.feeds.FeedSubscriptionManager;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * An implementation of the IFeedManager interface.
- * Provider necessary central repository for registering/retrieving
- * artifacts/services associated with a feed.
- */
-public class FeedManager implements IFeedManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
-
-    private final IFeedSubscriptionManager feedSubscriptionManager;
-
-    private final IFeedConnectionManager feedConnectionManager;
-
-    private final IFeedMemoryManager feedMemoryManager;
-
-    private final IFeedMetricCollector feedMetricCollector;
-
-    private final IFeedMetadataManager feedMetadataManager;
-
-    private final IFeedMessageService feedMessageService;
-
-    private final NodeLoadReportService nodeLoadReportService;
-
-    private final AsterixFeedProperties asterixFeedProperties;
-
-    private final String nodeId;
-
-    private final int frameSize;
-
-    public FeedManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize) throws AsterixException, HyracksDataException {
-        this.nodeId = nodeId;
-        this.feedSubscriptionManager = new FeedSubscriptionManager(nodeId);
-        this.feedConnectionManager = new FeedConnectionManager(nodeId);
-        this.feedMetadataManager = new FeedMetadataManager(nodeId);
-        this.feedMemoryManager = new FeedMemoryManager(nodeId, feedProperties, frameSize);
-        String ccClusterIp = AsterixClusterProperties.INSTANCE.getCluster() != null ? AsterixClusterProperties.INSTANCE
-                .getCluster().getMasterNode().getClusterIp() : "localhost";
-        this.feedMessageService = new FeedMessageService(feedProperties, nodeId, ccClusterIp);
-        this.nodeLoadReportService = new NodeLoadReportService(nodeId, this);
-        try {
-            this.feedMessageService.start();
-            this.nodeLoadReportService.start();
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to start feed services " + e.getMessage());
-            }
-            e.printStackTrace();
-        }
-        this.feedMetricCollector = new FeedMetricCollector(nodeId);
-        this.frameSize = frameSize;
-        this.asterixFeedProperties = feedProperties;
-    }
-
-    @Override
-    public IFeedSubscriptionManager getFeedSubscriptionManager() {
-        return feedSubscriptionManager;
-    }
-
-    @Override
-    public IFeedConnectionManager getFeedConnectionManager() {
-        return feedConnectionManager;
-    }
-
-    @Override
-    public IFeedMemoryManager getFeedMemoryManager() {
-        return feedMemoryManager;
-    }
-
-    @Override
-    public IFeedMetricCollector getFeedMetricCollector() {
-        return feedMetricCollector;
-    }
-
-    public int getFrameSize() {
-        return frameSize;
-    }
-
-    @Override
-    public IFeedMetadataManager getFeedMetadataManager() {
-        return feedMetadataManager;
-    }
-
-    @Override
-    public IFeedMessageService getFeedMessageService() {
-        return feedMessageService;
-    }
-
-    @Override
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public String toString() {
-        return "FeedManager " + "[" + nodeId + "]";
-    }
-
-    @Override
-    public AsterixFeedProperties getAsterixFeedProperties() {
-        return asterixFeedProperties;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java
deleted file mode 100644
index a3cd217..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMessageReceiver.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.util.logging.Level;
-
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.FeedConstants;
-import org.apache.asterix.common.feeds.FeedTupleCommitAckMessage;
-import org.apache.asterix.common.feeds.MessageReceiver;
-import org.apache.asterix.common.feeds.NodeLoadReport;
-import org.apache.asterix.common.feeds.api.IFeedLoadManager;
-import org.apache.asterix.common.feeds.api.IFeedMessage.MessageType;
-import org.apache.asterix.common.feeds.api.IFeedTrackingManager;
-import org.apache.asterix.common.feeds.message.FeedCongestionMessage;
-import org.apache.asterix.common.feeds.message.FeedReportMessage;
-import org.apache.asterix.common.feeds.message.ScaleInReportMessage;
-import org.apache.asterix.common.feeds.message.StorageReportFeedMessage;
-import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
-import org.apache.asterix.feeds.CentralFeedManager.AQLExecutor;
-import org.apache.asterix.hyracks.bootstrap.FeedBootstrap;
-
-public class FeedMessageReceiver extends MessageReceiver<String> {
-
-    private static boolean initialized;
-
-    private final IFeedLoadManager feedLoadManager;
-    private final IFeedTrackingManager feedTrackingManager;
-
-    public FeedMessageReceiver(CentralFeedManager centralFeedManager) {
-        this.feedLoadManager = centralFeedManager.getFeedLoadManager();
-        this.feedTrackingManager = centralFeedManager.getFeedTrackingManager();
-    }
-
-    @Override
-    public void processMessage(String message) throws Exception {
-        JSONObject obj = new JSONObject(message);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received message " + obj);
-        }
-        MessageType messageType = MessageType.valueOf(obj.getString(FeedConstants.MessageConstants.MESSAGE_TYPE));
-        switch (messageType) {
-            case XAQL:
-                if (!initialized) {
-                    FeedBootstrap.setUpInitialArtifacts();
-                    initialized = true;
-                }
-                AQLExecutor.executeAQL(obj.getString(FeedConstants.MessageConstants.AQL));
-                break;
-            case CONGESTION:
-                feedLoadManager.reportCongestion(FeedCongestionMessage.read(obj));
-                break;
-            case FEED_REPORT:
-                feedLoadManager.submitFeedRuntimeReport(FeedReportMessage.read(obj));
-                break;
-            case NODE_REPORT:
-                feedLoadManager.submitNodeLoadReport(NodeLoadReport.read(obj));
-                break;
-            case SCALE_IN_REQUEST:
-                feedLoadManager.submitScaleInPossibleReport(ScaleInReportMessage.read(obj));
-                break;
-            case STORAGE_REPORT:
-                FeedLifecycleListener.INSTANCE.updateTrackingInformation(StorageReportFeedMessage.read(obj));
-                break;
-            case COMMIT_ACK:
-                feedTrackingManager.submitAckReport(FeedTupleCommitAckMessage.read(obj));
-                break;
-            case THROTTLING_ENABLED:
-                feedLoadManager.reportThrottlingEnabled(ThrottlingEnabledFeedMessage.read(obj));
-            default:
-                break;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
deleted file mode 100644
index 81cabeb..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedMetadataManager.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.util.Date;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedMetadataManager;
-import org.apache.asterix.hyracks.bootstrap.FeedBootstrap;
-import org.apache.asterix.metadata.feeds.XAQLFeedMessage;
-import org.apache.asterix.om.base.ARecord;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FeedMetadataManager implements IFeedMetadataManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMetadataManager.class.getName());
-
-    private final String nodeId;
-    private ARecordType recordType;
-
-    public FeedMetadataManager(String nodeId) throws AsterixException, HyracksDataException {
-        this.nodeId = nodeId;
-        String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple", "message",
-                "timestamp" };
-        IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-                BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
-
-        recordType = new ARecordType(FeedBootstrap.FAILED_TUPLE_DATASET_TYPE, fieldNames, fieldTypes, true);
-    }
-
-    @Override
-    public void logTuple(FeedConnectionId connectionId, String tuple, String message, IFeedManager feedManager)
-            throws AsterixException {
-        try {
-            AString id = new AString("1");
-            AString dataverseValue = new AString(connectionId.getFeedId().getDataverse());
-            AString feedValue = new AString(connectionId.getFeedId().getFeedName());
-            AString targetDatasetValue = new AString(connectionId.getDatasetName());
-            AString tupleValue = new AString(tuple);
-            AString messageValue = new AString(message);
-            AString dateTime = new AString(new Date().toString());
-
-            IAObject[] fields = new IAObject[] { id, dataverseValue, feedValue, targetDatasetValue, tupleValue,
-                    messageValue, dateTime };
-            ARecord record = new ARecord(recordType, fields);
-            StringBuilder builder = new StringBuilder();
-            builder.append("use dataverse " + FeedBootstrap.FEEDS_METADATA_DV + ";" + "\n");
-            builder.append("insert into dataset " + FeedBootstrap.FAILED_TUPLE_DATASET + " ");
-            builder.append(" (" + recordToString(record) + ")");
-            builder.append(";");
-
-            XAQLFeedMessage xAqlMessage = new XAQLFeedMessage(connectionId, builder.toString());
-            feedManager.getFeedMessageService().sendMessage(xAqlMessage);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(" Sent " + xAqlMessage.toJSON());
-            }
-        } catch (Exception pe) {
-            throw new AsterixException(pe);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "FeedMetadataManager [" + nodeId + "]";
-    }
-
-    private String recordToString(ARecord record) {
-        String[] fieldNames = record.getType().getFieldNames();
-        StringBuilder sb = new StringBuilder();
-        sb.append("{ ");
-        for (int i = 0; i < fieldNames.length; i++) {
-            if (i > 0) {
-                sb.append(", ");
-            }
-            sb.append("\"" + fieldNames[i] + "\"");
-            sb.append(": ");
-            switch (record.getType().getFieldTypes()[i].getTypeTag()) {
-                case STRING:
-                    sb.append("\"" + ((AString) record.getValueByPos(i)).getStringValue() + "\"");
-                    break;
-                default:
-                    break;
-            }
-        }
-        sb.append(" }");
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java
deleted file mode 100644
index d57a971..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedTrackingManager.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedTupleCommitAckMessage;
-import org.apache.asterix.common.feeds.FeedTupleCommitResponseMessage;
-import org.apache.asterix.common.feeds.api.IFeedTrackingManager;
-import org.apache.asterix.file.FeedOperations;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedTrackingManager implements IFeedTrackingManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedTrackingManager.class.getName());
-
-    private final BitSet allOnes;
-
-    private Map<FeedConnectionId, Map<AckId, BitSet>> ackHistory;
-    private Map<FeedConnectionId, Map<AckId, Integer>> maxBaseAcked;
-
-    public FeedTrackingManager() {
-        byte[] allOneBytes = new byte[128];
-        Arrays.fill(allOneBytes, (byte) 0xff);
-        allOnes = BitSet.valueOf(allOneBytes);
-        ackHistory = new HashMap<FeedConnectionId, Map<AckId, BitSet>>();
-        maxBaseAcked = new HashMap<FeedConnectionId, Map<AckId, Integer>>();
-    }
-
-    @Override
-    public synchronized void submitAckReport(FeedTupleCommitAckMessage ackMessage) {
-        AckId ackId = getAckId(ackMessage);
-        Map<AckId, BitSet> acksForConnection = ackHistory.get(ackMessage.getConnectionId());
-        if (acksForConnection == null) {
-            acksForConnection = new HashMap<AckId, BitSet>();
-            acksForConnection.put(ackId, BitSet.valueOf(ackMessage.getCommitAcks()));
-            ackHistory.put(ackMessage.getConnectionId(), acksForConnection);
-        }
-        BitSet currentAcks = acksForConnection.get(ackId);
-        if (currentAcks == null) {
-            currentAcks = BitSet.valueOf(ackMessage.getCommitAcks());
-            acksForConnection.put(ackId, currentAcks);
-        } else {
-            currentAcks.or(BitSet.valueOf(ackMessage.getCommitAcks()));
-        }
-        if (Arrays.equals(currentAcks.toByteArray(), allOnes.toByteArray())) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(ackMessage.getIntakePartition() + " (" + ackMessage.getBase() + ")" + " is convered");
-            }
-            Map<AckId, Integer> maxBaseAckedForConnection = maxBaseAcked.get(ackMessage.getConnectionId());
-            if (maxBaseAckedForConnection == null) {
-                maxBaseAckedForConnection = new HashMap<AckId, Integer>();
-                maxBaseAcked.put(ackMessage.getConnectionId(), maxBaseAckedForConnection);
-            }
-            Integer maxBaseAckedValue = maxBaseAckedForConnection.get(ackId);
-            if (maxBaseAckedValue == null) {
-                maxBaseAckedValue = ackMessage.getBase();
-                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
-                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
-                        ackMessage.getBase());
-            } else if (ackMessage.getBase() == maxBaseAckedValue + 1) {
-                maxBaseAckedForConnection.put(ackId, ackMessage.getBase());
-                sendCommitResponseMessage(ackMessage.getConnectionId(), ackMessage.getIntakePartition(),
-                        ackMessage.getBase());
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Ignoring discountiuous acked base " + ackMessage.getBase() + " for " + ackId);
-                }
-            }
-
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("AckId " + ackId + " pending number of acks " + (128 * 8 - currentAcks.cardinality()));
-            }
-        }
-    }
-
-    public synchronized void disableTracking(FeedConnectionId connectionId) {
-        ackHistory.remove(connectionId);
-        maxBaseAcked.remove(connectionId);
-    }
-
-    private void sendCommitResponseMessage(FeedConnectionId connectionId, int partition, int base) {
-        FeedTupleCommitResponseMessage response = new FeedTupleCommitResponseMessage(connectionId, partition, base);
-        List<String> storageLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
-        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
-        String collectLocation = collectLocations.get(partition);
-        Set<String> messageDestinations = new HashSet<String>();
-        messageDestinations.add(collectLocation);
-        messageDestinations.addAll(storageLocations);
-        try {
-            JobSpecification spec = FeedOperations.buildCommitAckResponseJob(response, messageDestinations);
-            CentralFeedManager.runJob(spec, false);
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to send commit response message " + response + " exception " + e.getMessage());
-            }
-        }
-    }
-
-    private static AckId getAckId(FeedTupleCommitAckMessage ackMessage) {
-        return new AckId(ackMessage.getConnectionId(), ackMessage.getIntakePartition(), ackMessage.getBase());
-    }
-
-    private static class AckId {
-        private FeedConnectionId connectionId;
-        private int intakePartition;
-        private int base;
-
-        public AckId(FeedConnectionId connectionId, int intakePartition, int base) {
-            this.connectionId = connectionId;
-            this.intakePartition = intakePartition;
-            this.base = base;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (!(o instanceof AckId)) {
-                return false;
-            }
-            AckId other = (AckId) o;
-            return other.getConnectionId().equals(connectionId) && other.getIntakePartition() == intakePartition
-                    && other.getBase() == base;
-        }
-
-        @Override
-        public String toString() {
-            return connectionId + "[" + intakePartition + "]" + "(" + base + ")";
-        }
-
-        @Override
-        public int hashCode() {
-            return toString().hashCode();
-        }
-
-        public FeedConnectionId getConnectionId() {
-            return connectionId;
-        }
-
-        public int getIntakePartition() {
-            return intakePartition;
-        }
-
-        public int getBase() {
-            return base;
-        }
-
-    }
-
-    @Override
-    public void disableAcking(FeedConnectionId connectionId) {
-        ackHistory.remove(connectionId);
-        maxBaseAcked.remove(connectionId);
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("Acking disabled for " + connectionId);
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
deleted file mode 100644
index 3686a03..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedWorkRequestResponseHandler.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.common.feeds.FeedConnectJobInfo;
-import org.apache.asterix.common.feeds.FeedIntakeInfo;
-import org.apache.asterix.common.feeds.FeedJobInfo;
-import org.apache.asterix.metadata.cluster.AddNodeWork;
-import org.apache.asterix.metadata.cluster.AddNodeWorkResponse;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.Constraint;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
-import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
-import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedWorkRequestResponseHandler implements Runnable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedWorkRequestResponseHandler.class.getName());
-
-    private final LinkedBlockingQueue<IClusterManagementWorkResponse> inbox;
-
-    private Map<Integer, Map<String, List<FeedJobInfo>>> feedsWaitingForResponse = new HashMap<Integer, Map<String, List<FeedJobInfo>>>();
-
-    public FeedWorkRequestResponseHandler(LinkedBlockingQueue<IClusterManagementWorkResponse> inbox) {
-        this.inbox = inbox;
-    }
-
-    @Override
-    public void run() {
-        while (true) {
-            IClusterManagementWorkResponse response = null;
-            try {
-                response = inbox.take();
-            } catch (InterruptedException e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Interrupted exception " + e.getMessage());
-                }
-            }
-            IClusterManagementWork submittedWork = response.getWork();
-            Map<String, String> nodeSubstitution = new HashMap<String, String>();
-            switch (submittedWork.getClusterManagementWorkType()) {
-                case ADD_NODE:
-                    AddNodeWork addNodeWork = (AddNodeWork) submittedWork;
-                    int workId = addNodeWork.getWorkId();
-                    Map<String, List<FeedJobInfo>> failureAnalysis = feedsWaitingForResponse.get(workId);
-                    AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
-                    List<String> nodesAdded = resp.getNodesAdded();
-                    List<String> unsubstitutedNodes = new ArrayList<String>();
-                    unsubstitutedNodes.addAll(addNodeWork.getDeadNodes());
-                    int nodeIndex = 0;
-
-                    /** form a mapping between the failed node and its substitute **/
-                    if (nodesAdded != null && nodesAdded.size() > 0) {
-                        for (String failedNodeId : addNodeWork.getDeadNodes()) {
-                            String substitute = nodesAdded.get(nodeIndex);
-                            nodeSubstitution.put(failedNodeId, substitute);
-                            nodeIndex = (nodeIndex + 1) % nodesAdded.size();
-                            unsubstitutedNodes.remove(failedNodeId);
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Node " + substitute + " chosen to substiute lost node " + failedNodeId);
-                            }
-                        }
-                    }
-                    if (unsubstitutedNodes.size() > 0) {
-                        String[] participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes()
-                                .toArray(new String[] {});
-                        nodeIndex = 0;
-                        for (String unsubstitutedNode : unsubstitutedNodes) {
-                            nodeSubstitution.put(unsubstitutedNode, participantNodes[nodeIndex]);
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Node " + participantNodes[nodeIndex] + " chosen to substiute lost node "
-                                        + unsubstitutedNode);
-                            }
-                            nodeIndex = (nodeIndex + 1) % participantNodes.length;
-                        }
-
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Request " + resp.getWork() + " completed using internal nodes");
-                        }
-                    }
-
-                    // alter failed feed intake jobs
-
-                    for (Entry<String, List<FeedJobInfo>> entry : failureAnalysis.entrySet()) {
-                        String failedNode = entry.getKey();
-                        List<FeedJobInfo> impactedJobInfos = entry.getValue();
-                        for (FeedJobInfo info : impactedJobInfos) {
-                            JobSpecification spec = info.getSpec();
-                            replaceNode(spec, failedNode, nodeSubstitution.get(failedNode));
-                            info.setSpec(spec);
-                        }
-                    }
-
-                    Set<FeedIntakeInfo> revisedIntakeJobs = new HashSet<FeedIntakeInfo>();
-                    Set<FeedConnectJobInfo> revisedConnectJobInfos = new HashSet<FeedConnectJobInfo>();
-
-                    for (List<FeedJobInfo> infos : failureAnalysis.values()) {
-                        for (FeedJobInfo info : infos) {
-                            switch (info.getJobType()) {
-                                case INTAKE:
-                                    revisedIntakeJobs.add((FeedIntakeInfo) info);
-                                    break;
-                                case FEED_CONNECT:
-                                    revisedConnectJobInfos.add((FeedConnectJobInfo) info);
-                                    break;
-                            }
-                        }
-                    }
-
-                    IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-                    try {
-                        for (FeedIntakeInfo info : revisedIntakeJobs) {
-                            hcc.startJob(info.getSpec());
-                        }
-                        Thread.sleep(2000);
-                        for (FeedConnectJobInfo info : revisedConnectJobInfos) {
-                            hcc.startJob(info.getSpec());
-                            Thread.sleep(2000);
-                        }
-                    } catch (Exception e) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Unable to start revised job post failure");
-                        }
-                    }
-
-                    break;
-                case REMOVE_NODE:
-                    throw new IllegalStateException("Invalid work submitted");
-            }
-        }
-    }
-
-    private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
-        Set<Constraint> userConstraints = jobSpec.getUserConstraints();
-        List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
-        List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
-        List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
-        Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
-        Map<OperatorDescriptorId, Map<Integer, String>> newConstraints = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
-        OperatorDescriptorId opId = null;
-        for (Constraint constraint : userConstraints) {
-            LValueConstraintExpression lexpr = constraint.getLValue();
-            ConstraintExpression cexpr = constraint.getRValue();
-            switch (lexpr.getTag()) {
-                case PARTITION_COUNT:
-                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
-                    if (modifiedOperators.contains(opId)) {
-                        countConstraintsToReplace.add(constraint);
-                    } else {
-                        List<Constraint> clist = candidateConstraints.get(opId);
-                        if (clist == null) {
-                            clist = new ArrayList<Constraint>();
-                            candidateConstraints.put(opId, clist);
-                        }
-                        clist.add(constraint);
-                    }
-                    break;
-                case PARTITION_LOCATION:
-                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
-                    String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
-                    if (oldLocation.equals(failedNodeId)) {
-                        locationConstraintsToReplace.add(constraint);
-                        modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
-                        Map<Integer, String> newLocs = newConstraints.get(opId);
-                        if (newLocs == null) {
-                            newLocs = new HashMap<Integer, String>();
-                            newConstraints.put(opId, newLocs);
-                        }
-                        int partition = ((PartitionLocationExpression) lexpr).getPartition();
-                        newLocs.put(partition, replacementNode);
-                    } else {
-                        if (modifiedOperators.contains(opId)) {
-                            locationConstraintsToReplace.add(constraint);
-                            Map<Integer, String> newLocs = newConstraints.get(opId);
-                            if (newLocs == null) {
-                                newLocs = new HashMap<Integer, String>();
-                                newConstraints.put(opId, newLocs);
-                            }
-                            int partition = ((PartitionLocationExpression) lexpr).getPartition();
-                            newLocs.put(partition, oldLocation);
-                        } else {
-                            List<Constraint> clist = candidateConstraints.get(opId);
-                            if (clist == null) {
-                                clist = new ArrayList<Constraint>();
-                                candidateConstraints.put(opId, clist);
-                            }
-                            clist.add(constraint);
-                        }
-                    }
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
-        jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
-
-        for (OperatorDescriptorId mopId : modifiedOperators) {
-            List<Constraint> clist = candidateConstraints.get(mopId);
-            if (clist != null && !clist.isEmpty()) {
-                jobSpec.getUserConstraints().removeAll(clist);
-
-                for (Constraint c : clist) {
-                    if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
-                        ConstraintExpression cexpr = c.getRValue();
-                        int partition = ((PartitionLocationExpression) c.getLValue()).getPartition();
-                        String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
-                        newConstraints.get(mopId).put(partition, oldLocation);
-                    }
-                }
-            }
-        }
-
-        for (Entry<OperatorDescriptorId, Map<Integer, String>> entry : newConstraints.entrySet()) {
-            OperatorDescriptorId nopId = entry.getKey();
-            Map<Integer, String> clist = entry.getValue();
-            IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
-            String[] locations = new String[clist.size()];
-            for (int i = 0; i < locations.length; i++) {
-                locations[i] = clist.get(i);
-            }
-            PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations);
-        }
-
-    }
-
-    public void registerFeedWork(int workId, Map<String, List<FeedJobInfo>> impactedJobs) {
-        feedsWaitingForResponse.put(workId, impactedJobs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java b/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
deleted file mode 100644
index 7660007..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/feeds/FeedsActivator.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.feeds;
-
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.api.common.SessionConfig;
-import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.aql.translator.QueryTranslator;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
-import org.apache.asterix.lang.common.statement.DataverseDecl;
-import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.api.job.JobId;
-
-public class FeedsActivator implements Runnable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
-    private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
-
-    private List<FeedCollectInfo> feedsToRevive;
-    private Mode mode;
-
-    public enum Mode {
-        REVIVAL_POST_CLUSTER_REBOOT,
-        REVIVAL_POST_NODE_REJOIN
-    }
-
-    public FeedsActivator() {
-        this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT;
-    }
-
-    public FeedsActivator(List<FeedCollectInfo> feedsToRevive) {
-        this.feedsToRevive = feedsToRevive;
-        this.mode = Mode.REVIVAL_POST_NODE_REJOIN;
-    }
-
-    @Override
-    public void run() {
-        switch (mode) {
-            case REVIVAL_POST_CLUSTER_REBOOT:
-                //revivePostClusterReboot();
-                break;
-            case REVIVAL_POST_NODE_REJOIN:
-                try {
-                    Thread.sleep(10000);
-                } catch (InterruptedException e1) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Attempt to resume feed interrupted");
-                    }
-                    throw new IllegalStateException(e1.getMessage());
-                }
-                for (FeedCollectInfo finfo : feedsToRevive) {
-                    try {
-                        JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
-                            LOGGER.info("Job:" + finfo.jobSpec);
-                        }
-                    } catch (Exception e) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
-                        }
-                    }
-                }
-        }
-    }
-
-    public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
-        PrintWriter writer = new PrintWriter(System.out, true);
-        SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
-        try {
-            DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
-            ConnectFeedStatement stmt = new ConnectFeedStatement(new Identifier(dataverse), new Identifier(feedName),
-                    new Identifier(dataset), feedPolicy, 0);
-            stmt.setForceConnect(true);
-            List<Statement> statements = new ArrayList<Statement>();
-            statements.add(dataverseDecl);
-            statements.add(stmt);
-            QueryTranslator translator = new QueryTranslator(statements, pc, compilationProvider);
-            translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                    QueryTranslator.ResultDelivery.SYNC);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy "
-                        + feedPolicy + " Exception " + e.getMessage());
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
index 2ee9dd4..77c6a54 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
@@ -44,6 +44,7 @@ import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.FilesIndexDescription;
 import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
 import org.apache.asterix.external.operators.ExternalDatasetIndexesCommitOperatorDescriptor;
 import org.apache.asterix.external.operators.ExternalDatasetIndexesRecoverOperatorDescriptor;
@@ -60,7 +61,6 @@ import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
 import org.apache.asterix.om.types.ARecordType;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java
index cb55c5f..6a036c0 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java
@@ -22,24 +22,24 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedConnectJobInfo;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedConstants;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.feeds.FeedTupleCommitResponseMessage;
-import org.apache.asterix.common.feeds.api.IFeedJoint;
-import org.apache.asterix.common.feeds.api.IFeedMessage;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.message.EndFeedMessage;
-import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
 import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.feeds.FeedLifecycleListener;
+import org.apache.asterix.external.feed.api.IFeedJoint;
+import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.message.EndFeedMessage;
+import org.apache.asterix.external.feed.message.FeedTupleCommitResponseMessage;
+import org.apache.asterix.external.feed.message.PrepareStallMessage;
+import org.apache.asterix.external.feed.message.TerminateDataFlowMessage;
+import org.apache.asterix.external.feed.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
+import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor;
+import org.apache.asterix.external.util.FeedConstants;
+import org.apache.asterix.feed.FeedLifecycleListener;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.entities.PrimaryFeed;
-import org.apache.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
-import org.apache.asterix.metadata.feeds.PrepareStallMessage;
-import org.apache.asterix.metadata.feeds.TerminateDataFlowMessage;
+import org.apache.asterix.metadata.entities.Feed;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -63,7 +63,7 @@ public class FeedOperations {
      * @return JobSpecification the Hyracks job specification for receiving data from external source
      * @throws Exception
      */
-    public static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(PrimaryFeed primaryFeed,
+    public static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed primaryFeed,
             AqlMetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
 
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
index a579d2c..6d23f3c 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
@@ -31,9 +31,9 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
 import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;