You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:17 UTC

[20/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
deleted file mode 100644
index 4cdc91c..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ /dev/null
@@ -1,1188 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.hyracks.bootstrap;
-
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.rmi.RemoteException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.lang3.StringUtils;
-
-import edu.uci.ics.asterix.api.common.APIFramework;
-import edu.uci.ics.asterix.api.common.SessionConfig;
-import edu.uci.ics.asterix.aql.base.Statement;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DataverseDecl;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.Identifier;
-import edu.uci.ics.asterix.aql.translator.AqlTranslator;
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.SuperFeedManager;
-import edu.uci.ics.asterix.event.schema.cluster.Cluster;
-import edu.uci.ics.asterix.event.schema.cluster.Node;
-import edu.uci.ics.asterix.file.JobSpecificationUtils;
-import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailure.FailureType;
-import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
-import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
-import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
-import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
-import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.entities.Dataverse;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
-import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
-import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
-import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedManagerElectMessage;
-import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedPolicyAccessor;
-import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
-import edu.uci.ics.asterix.metadata.feeds.MessageListener;
-import edu.uci.ics.asterix.metadata.feeds.MessageListener.IMessageAnalyzer;
-import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
-import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.constraints.Constraint;
-import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
-import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobInfo;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
-
-/**
- * A listener that subscribes to events associated with cluster membership (nodes joining/leaving the cluster)
- * and job lifecycle (start/end of a job). Subscription to such events allows keeping track of feed ingestion jobs
- * and take any corrective action that may be required when a node involved in a feed leaves the cluster.
- */
-public class FeedLifecycleListener implements IJobLifecycleListener, IClusterEventsSubscriber, Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOGGER = Logger.getLogger(FeedLifecycleListener.class.getName());
-
-    public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
-
-    public static final int FEED_HEALTH_PORT = 2999;
-
-    private LinkedBlockingQueue<Message> jobEventInbox;
-    private LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
-    private Map<FeedInfo, List<String>> dependentFeeds = new HashMap<FeedInfo, List<String>>();
-    private IMessageAnalyzer healthDataParser;
-    private MessageListener feedHealthDataListener;
-    private ExecutorService executorService = Executors.newCachedThreadPool();
-    private Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue = new HashMap<FeedConnectionId, LinkedBlockingQueue<String>>();
-    private State state;
-
-    private FeedLifecycleListener() {
-        jobEventInbox = new LinkedBlockingQueue<Message>();
-        feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
-        responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
-        feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
-        this.healthDataParser = new FeedHealthDataParser();
-        feedHealthDataListener = new MessageListener(FEED_HEALTH_PORT, healthDataParser.getMessageQueue());
-        try {
-            feedHealthDataListener.start();
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to start Feed health data listener");
-            }
-        }
-        executorService.execute(feedJobNotificationHandler);
-        executorService.execute(feedWorkRequestResponseHandler);
-        ClusterManager.INSTANCE.registerSubscriber(this);
-        state = AsterixClusterProperties.INSTANCE.getState();
-
-    }
-
-    private final FeedJobNotificationHandler feedJobNotificationHandler;
-    private final FeedWorkRequestResponseHandler feedWorkRequestResponseHandler;
-
-    @Override
-    public void notifyJobStart(JobId jobId) throws HyracksException {
-        if (feedJobNotificationHandler.isRegisteredFeed(jobId)) {
-            jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_START));
-        }
-    }
-
-    @Override
-    public void notifyJobFinish(JobId jobId) throws HyracksException {
-        if (feedJobNotificationHandler.isRegisteredFeed(jobId)) {
-            jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_FINISH));
-        }
-    }
-
-    @Override
-    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
-
-        JobSpecification spec = acggf.getJobSpecification();
-        boolean feedIngestionJob = false;
-        FeedConnectionId feedId = null;
-        Map<String, String> feedPolicy = null;
-        for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
-            if (!(opDesc instanceof FeedIntakeOperatorDescriptor)) {
-                continue;
-            }
-            feedId = ((FeedIntakeOperatorDescriptor) opDesc).getFeedId();
-            feedPolicy = ((FeedIntakeOperatorDescriptor) opDesc).getFeedPolicy();
-            feedIngestionJob = true;
-            break;
-        }
-        if (feedIngestionJob) {
-            feedJobNotificationHandler.registerFeed(feedId, jobId, spec, feedPolicy);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Registered feed: " + feedId + " ingestion policy "
-                        + feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
-            }
-        }
-
-    }
-
-    public void registerFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
-        feedReportQueue.put(feedId, queue);
-    }
-
-    public void deregisterFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
-        feedReportQueue.remove(feedId);
-    }
-
-    public LinkedBlockingQueue<String> getFeedReportQueue(FeedConnectionId feedId) {
-        return feedReportQueue.get(feedId);
-    }
-
-    private static class Message {
-        public JobId jobId;
-
-        public enum MessageKind {
-            JOB_START,
-            JOB_FINISH
-        }
-
-        public MessageKind messageKind;
-
-        public Message(JobId jobId, MessageKind msgKind) {
-            this.jobId = jobId;
-            this.messageKind = msgKind;
-        }
-    }
-
-    public static class FeedFailureReport {
-        public Map<FeedInfo, List<FeedFailure>> failures = new HashMap<FeedInfo, List<FeedFailure>>();
-
-        @Override
-        public String toString() {
-            StringBuilder builder = new StringBuilder();
-            for (Map.Entry<FeedLifecycleListener.FeedInfo, List<FeedLifecycleListener.FeedFailure>> entry : failures
-                    .entrySet()) {
-                builder.append(entry.getKey() + " -> failures");
-                for (FeedFailure failure : entry.getValue()) {
-                    builder.append("failure -> " + failure);
-                }
-            }
-            return builder.toString();
-        }
-    }
-
-    private static class FeedHealthDataParser implements IMessageAnalyzer {
-
-        private LinkedBlockingQueue<String> inbox = new LinkedBlockingQueue<String>();
-
-        @Override
-        public LinkedBlockingQueue<String> getMessageQueue() {
-            return inbox;
-        }
-
-    }
-
-    private static class FeedJobNotificationHandler implements Runnable, Serializable {
-
-        private static final long serialVersionUID = 1L;
-        private LinkedBlockingQueue<Message> inbox;
-        private Map<JobId, FeedInfo> registeredFeeds = new HashMap<JobId, FeedInfo>();
-        private FeedMessenger feedMessenger;
-        private LinkedBlockingQueue<FeedMessengerMessage> messengerOutbox;
-        private int superFeedManagerPort = 3000;
-
-        public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) {
-            this.inbox = inbox;
-            messengerOutbox = new LinkedBlockingQueue<FeedMessengerMessage>();
-            feedMessenger = new FeedMessenger(messengerOutbox);
-            (new Thread(feedMessenger)).start();
-        }
-
-        public boolean isRegisteredFeed(JobId jobId) {
-            return registeredFeeds.containsKey(jobId);
-        }
-
-        public void registerFeed(FeedConnectionId feedId, JobId jobId, JobSpecification jobSpec,
-                Map<String, String> feedPolicy) {
-            if (registeredFeeds.containsKey(jobId)) {
-                throw new IllegalStateException(" Feed already registered ");
-            }
-            registeredFeeds.put(jobId, new FeedInfo(feedId, jobSpec, feedPolicy, jobId));
-        }
-
-        public void deregisterFeed(JobId jobId) {
-            FeedInfo feedInfo = registeredFeeds.remove(jobId);
-            if (feedInfo != null) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("DeRegistered Feed Info :" + feedInfo);
-                }
-            }
-        }
-
-        public void deregisterFeed(FeedInfo feedInfo) {
-            JobId jobId = feedInfo.jobId;
-            deregisterFeed(jobId);
-        }
-
-        @Override
-        public void run() {
-            Message mesg;
-            while (true) {
-                try {
-                    mesg = inbox.take();
-                    FeedInfo feedInfo = registeredFeeds.get(mesg.jobId);
-                    switch (mesg.messageKind) {
-                        case JOB_START:
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Job started for feed id" + feedInfo.feedConnectionId);
-                            }
-                            handleJobStartMessage(feedInfo, mesg);
-                            break;
-                        case JOB_FINISH:
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Job finished for feed id" + feedInfo.feedConnectionId);
-                            }
-                            handleJobFinishMessage(feedInfo, mesg);
-                            deregisterFeed(mesg.jobId);
-                            break;
-                    }
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-
-            }
-        }
-
-        private void handleJobStartMessage(FeedInfo feedInfo, Message message) {
-
-            JobSpecification jobSpec = feedInfo.jobSpec;
-
-            List<OperatorDescriptorId> ingestOperatorIds = new ArrayList<OperatorDescriptorId>();
-            List<OperatorDescriptorId> computeOperatorIds = new ArrayList<OperatorDescriptorId>();
-            List<OperatorDescriptorId> storageOperatorIds = new ArrayList<OperatorDescriptorId>();
-
-            Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
-            for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
-                IOperatorDescriptor opDesc = entry.getValue();
-                IOperatorDescriptor actualOp = null;
-                if (opDesc instanceof FeedMetaOperatorDescriptor) {
-                    actualOp = ((FeedMetaOperatorDescriptor) opDesc).getCoreOperator();
-                } else {
-                    actualOp = opDesc;
-                }
-
-                if (actualOp instanceof AlgebricksMetaOperatorDescriptor) {
-                    AlgebricksMetaOperatorDescriptor op = ((AlgebricksMetaOperatorDescriptor) actualOp);
-                    IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories();
-                    for (IPushRuntimeFactory rf : runtimeFactories) {
-                        if (rf instanceof AssignRuntimeFactory) {
-                            computeOperatorIds.add(entry.getKey());
-                        }
-                    }
-                } else if (actualOp instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
-                    storageOperatorIds.add(entry.getKey());
-                } else if (actualOp instanceof FeedIntakeOperatorDescriptor) {
-                    ingestOperatorIds.add(entry.getKey());
-                }
-            }
-
-            try {
-                IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-                JobInfo info = hcc.getJobInfo(message.jobId);
-                feedInfo.jobInfo = info;
-                Map<String, String> feedActivityDetails = new HashMap<String, String>();
-                StringBuilder ingestLocs = new StringBuilder();
-                for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
-                    Map<Integer, String> operatorLocations = info.getOperatorLocations().get(ingestOpId);
-                    int nOperatorInstances = operatorLocations.size();
-                    for (int i = 0; i < nOperatorInstances; i++) {
-                        feedInfo.ingestLocations.add(operatorLocations.get(i));
-                    }
-                }
-                StringBuilder computeLocs = new StringBuilder();
-                for (OperatorDescriptorId computeOpId : computeOperatorIds) {
-                    Map<Integer, String> operatorLocations = info.getOperatorLocations().get(computeOpId);
-                    if (operatorLocations != null) {
-                        int nOperatorInstances = operatorLocations.size();
-                        for (int i = 0; i < nOperatorInstances; i++) {
-                            feedInfo.computeLocations.add(operatorLocations.get(i));
-                        }
-                    } else {
-                        feedInfo.computeLocations.addAll(feedInfo.ingestLocations);
-                    }
-                }
-
-                StringBuilder storageLocs = new StringBuilder();
-                for (OperatorDescriptorId storageOpId : storageOperatorIds) {
-                    Map<Integer, String> operatorLocations = info.getOperatorLocations().get(storageOpId);
-                    int nOperatorInstances = operatorLocations.size();
-                    for (int i = 0; i < nOperatorInstances; i++) {
-                        feedInfo.storageLocations.add(operatorLocations.get(i));
-                    }
-                }
-
-                ingestLocs.append(StringUtils.join(feedInfo.ingestLocations, ","));
-                computeLocs.append(StringUtils.join(feedInfo.computeLocations, ","));
-                storageLocs.append(StringUtils.join(feedInfo.storageLocations, ","));
-
-                feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
-                feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
-                feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS, storageLocs.toString());
-                String policyName = feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
-                feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, policyName);
-
-                FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedInfo.feedPolicy);
-                if (policyAccessor.collectStatistics() || policyAccessor.isElastic()) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Feed " + feedInfo.feedConnectionId + " requires Super Feed Manager");
-                    }
-                    configureSuperFeedManager(feedInfo, feedActivityDetails);
-                }
-
-                MetadataManager.INSTANCE.acquireWriteLatch();
-                MetadataTransactionContext mdTxnCtx = null;
-                try {
-                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                    FeedActivity fa = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(mdTxnCtx,
-                            feedInfo.feedConnectionId, null);
-                    FeedActivityType nextState = FeedActivityType.FEED_BEGIN;
-                    FeedActivity feedActivity = new FeedActivity(feedInfo.feedConnectionId.getDataverse(),
-                            feedInfo.feedConnectionId.getFeedName(), feedInfo.feedConnectionId.getDatasetName(),
-                            nextState, feedActivityDetails);
-                    MetadataManager.INSTANCE.registerFeedActivity(mdTxnCtx, feedInfo.feedConnectionId, feedActivity);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                } catch (Exception e) {
-                    MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                } finally {
-                    MetadataManager.INSTANCE.releaseWriteLatch();
-                }
-            } catch (Exception e) {
-                // TODO Add Exception handling here
-            }
-
-        }
-
-        private void configureSuperFeedManager(FeedInfo feedInfo, Map<String, String> feedActivityDetails) {
-            // TODO Auto-generated method stub
-            int superFeedManagerIndex = new Random().nextInt(feedInfo.ingestLocations.size());
-            String superFeedManagerHost = feedInfo.ingestLocations.get(superFeedManagerIndex);
-
-            Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
-            String instanceName = cluster.getInstanceName();
-            String node = superFeedManagerHost.substring(instanceName.length() + 1);
-            String hostIp = null;
-            for (Node n : cluster.getNode()) {
-                if (n.getId().equals(node)) {
-                    hostIp = n.getClusterIp();
-                    break;
-                }
-            }
-            if (hostIp == null) {
-                throw new IllegalStateException("Unknown node " + superFeedManagerHost);
-            }
-
-            feedActivityDetails.put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_HOST, hostIp);
-            feedActivityDetails
-                    .put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_PORT, "" + superFeedManagerPort);
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + hostIp + " node "
-                        + superFeedManagerHost);
-            }
-
-            FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(hostIp, superFeedManagerHost,
-                    superFeedManagerPort, feedInfo.feedConnectionId);
-            superFeedManagerPort += SuperFeedManager.PORT_RANGE_ASSIGNED;
-            messengerOutbox.add(new FeedMessengerMessage(feedMessage, feedInfo));
-
-        }
-
-        private void handleJobFinishMessage(FeedInfo feedInfo, Message message) {
-            MetadataManager.INSTANCE.acquireWriteLatch();
-            MetadataTransactionContext mdTxnCtx = null;
-            boolean feedFailedDueToPostSubmissionNodeLoss = verfyReasonForFailure(feedInfo);
-            if (!feedFailedDueToPostSubmissionNodeLoss) {
-                try {
-                    IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-                    JobInfo info = hcc.getJobInfo(message.jobId);
-                    JobStatus status = info.getStatus();
-                    boolean failure = status != null && status.equals(JobStatus.FAILURE);
-                    FeedActivityType activityType = FeedActivityType.FEED_END;
-                    Map<String, String> details = new HashMap<String, String>();
-                    if (failure) {
-                        activityType = FeedActivityType.FEED_FAILURE;
-                    }
-                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                    FeedActivity feedActivity = new FeedActivity(feedInfo.feedConnectionId.getDataverse(),
-                            feedInfo.feedConnectionId.getFeedName(), feedInfo.feedConnectionId.getDatasetName(),
-                            activityType, details);
-                    MetadataManager.INSTANCE.registerFeedActivity(mdTxnCtx, new FeedConnectionId(
-                            feedInfo.feedConnectionId.getDataverse(), feedInfo.feedConnectionId.getFeedName(),
-                            feedInfo.feedConnectionId.getDatasetName()), feedActivity);
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                } catch (RemoteException | ACIDException | MetadataException e) {
-                    try {
-                        MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                    } catch (RemoteException | ACIDException ae) {
-                        throw new IllegalStateException(" Unable to abort ");
-                    }
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Exception in handling job fninsh message " + message.jobId + "["
-                                + message.messageKind + "]" + " for job " + message.jobId);
-                    }
-                } finally {
-                    MetadataManager.INSTANCE.releaseWriteLatch();
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Attempt to revive feed");
-                }
-                FeedsActivator activator = new FeedsActivator();
-                String dataverse = feedInfo.feedConnectionId.getDataverse();
-                String datasetName = feedInfo.feedConnectionId.getDatasetName();
-                String feedName = feedInfo.feedConnectionId.getFeedName();
-                String feedPolicy = feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
-                activator.reviveFeed(dataverse, feedName, datasetName, feedPolicy);
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Revived Feed");
-                }
-
-            }
-        }
-
-        private boolean verfyReasonForFailure(FeedInfo feedInfo) {
-            JobSpecification spec = feedInfo.jobSpec;
-            Set<Constraint> userConstraints = spec.getUserConstraints();
-            List<String> locations = new ArrayList<String>();
-            for (Constraint constraint : userConstraints) {
-                LValueConstraintExpression lexpr = constraint.getLValue();
-                ConstraintExpression cexpr = constraint.getRValue();
-                switch (lexpr.getTag()) {
-                    case PARTITION_LOCATION:
-                        String location = (String) ((ConstantExpression) cexpr).getValue();
-                        locations.add(location);
-                        break;
-                }
-            }
-            Set<String> participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
-            List<String> nodesFailedPostSubmission = new ArrayList<String>();
-            for (String location : locations) {
-                if (!participantNodes.contains(location)) {
-                    nodesFailedPostSubmission.add(location);
-                }
-            }
-
-            if (nodesFailedPostSubmission.size() > 0) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Feed failed as nodes failed post submission");
-                }
-                return true;
-            } else {
-                return false;
-            }
-
-        }
-
-        public static class FeedMessengerMessage {
-            private final IFeedMessage message;
-            private final FeedInfo feedInfo;
-
-            public FeedMessengerMessage(IFeedMessage message, FeedInfo feedInfo) {
-                this.message = message;
-                this.feedInfo = feedInfo;
-            }
-
-            public IFeedMessage getMessage() {
-                return message;
-            }
-
-            public FeedInfo getFeedInfo() {
-                return feedInfo;
-            }
-        }
-
-        private static class FeedMessenger implements Runnable {
-
-            private final LinkedBlockingQueue<FeedMessengerMessage> inbox;
-
-            public FeedMessenger(LinkedBlockingQueue<FeedMessengerMessage> inbox) {
-                this.inbox = inbox;
-            }
-
-            public void run() {
-                while (true) {
-                    FeedMessengerMessage message = null;
-                    try {
-                        message = inbox.take();
-                        FeedInfo feedInfo = message.getFeedInfo();
-                        switch (message.getMessage().getMessageType()) {
-                            case SUPER_FEED_MANAGER_ELECT:
-                                Thread.sleep(2000);
-                                sendSuperFeedManangerElectMessage(feedInfo,
-                                        (FeedManagerElectMessage) message.getMessage());
-                                if (LOGGER.isLoggable(Level.WARNING)) {
-                                    LOGGER.warning("Sent super feed manager election message" + message.getMessage());
-                                }
-                        }
-                    } catch (InterruptedException ie) {
-                        break;
-                    }
-                }
-            }
-
-        }
-    }
-
-    public static class FeedInfo {
-        public FeedConnectionId feedConnectionId;
-        public JobSpecification jobSpec;
-        public List<String> ingestLocations = new ArrayList<String>();
-        public List<String> computeLocations = new ArrayList<String>();
-        public List<String> storageLocations = new ArrayList<String>();
-        public JobInfo jobInfo;
-        public Map<String, String> feedPolicy;
-        public JobId jobId;
-
-        public FeedInfo(FeedConnectionId feedId, JobSpecification jobSpec, Map<String, String> feedPolicy, JobId jobId) {
-            this.feedConnectionId = feedId;
-            this.jobSpec = jobSpec;
-            this.feedPolicy = feedPolicy;
-            this.jobId = jobId;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof FeedInfo)) {
-                return false;
-            }
-            return ((FeedInfo) o).feedConnectionId.equals(feedConnectionId);
-        }
-
-        @Override
-        public int hashCode() {
-            return feedConnectionId.hashCode();
-        }
-
-        @Override
-        public String toString() {
-            return feedConnectionId + " job id " + jobId;
-        }
-
-    }
-
-    @Override
-    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
-        Collection<FeedInfo> feedInfos = feedJobNotificationHandler.registeredFeeds.values();
-        FeedFailureReport failureReport = new FeedFailureReport();
-        for (FeedInfo feedInfo : feedInfos) {
-            for (String deadNodeId : deadNodeIds) {
-                if (feedInfo.ingestLocations.contains(deadNodeId)) {
-                    List<FeedFailure> failures = failureReport.failures.get(feedInfo);
-                    if (failures == null) {
-                        failures = new ArrayList<FeedFailure>();
-                        failureReport.failures.put(feedInfo, failures);
-                    }
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Inestion Node Failure! " + deadNodeId);
-                    }
-                    failures.add(new FeedFailure(FeedFailure.FailureType.INGESTION_NODE, deadNodeId));
-                }
-                if (feedInfo.computeLocations.contains(deadNodeId)) {
-                    List<FeedFailure> failures = failureReport.failures.get(feedInfo);
-                    if (failures == null) {
-                        failures = new ArrayList<FeedFailure>();
-                        failureReport.failures.put(feedInfo, failures);
-                    }
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Compute Node Failure! " + deadNodeId);
-                    }
-                    failures.add(new FeedFailure(FeedFailure.FailureType.COMPUTE_NODE, deadNodeId));
-                }
-                if (feedInfo.storageLocations.contains(deadNodeId)) {
-                    List<FeedFailure> failures = failureReport.failures.get(feedInfo);
-                    if (failures == null) {
-                        failures = new ArrayList<FeedFailure>();
-                        failureReport.failures.put(feedInfo, failures);
-                    }
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Storage Node Failure! " + deadNodeId);
-                    }
-                    failures.add(new FeedFailure(FeedFailure.FailureType.STORAGE_NODE, deadNodeId));
-                }
-            }
-        }
-        if (failureReport.failures.isEmpty()) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                StringBuilder builder = new StringBuilder();
-                builder.append("No feed is affected by the failure of node(s): ");
-                for (String deadNodeId : deadNodeIds) {
-                    builder.append(deadNodeId + " ");
-                }
-                LOGGER.info(builder.toString());
-            }
-            return new HashSet<IClusterManagementWork>();
-        } else {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                StringBuilder builder = new StringBuilder();
-                builder.append("Feed affected by the failure of node(s): ");
-                for (String deadNodeId : deadNodeIds) {
-                    builder.append(deadNodeId + " ");
-                }
-                builder.append("\n");
-                for (FeedInfo fInfo : failureReport.failures.keySet()) {
-                    builder.append(fInfo.feedConnectionId);
-                    feedJobNotificationHandler.deregisterFeed(fInfo);
-                }
-                LOGGER.warning(builder.toString());
-            }
-            return handleFailure(failureReport);
-        }
-    }
-
-    private Set<IClusterManagementWork> handleFailure(FeedFailureReport failureReport) {
-        reportFeedFailure(failureReport);
-        Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
-        Map<String, Map<FeedInfo, List<FailureType>>> failureMap = new HashMap<String, Map<FeedInfo, List<FailureType>>>();
-        FeedPolicyAccessor fpa = null;
-        List<FeedInfo> feedsToTerminate = new ArrayList<FeedInfo>();
-        for (Map.Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
-            FeedInfo feedInfo = entry.getKey();
-            fpa = new FeedPolicyAccessor(feedInfo.feedPolicy);
-            if (!fpa.continueOnHardwareFailure()) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Feed " + feedInfo.feedConnectionId + " is governed by policy "
-                            + feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
-                    LOGGER.warning("Feed policy does not require feed to recover from hardware failure. Feed will terminate");
-                }
-                continue;
-            } else {
-                // insert feed recovery mode 
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Feed " + feedInfo.feedConnectionId + " is governed by policy "
-                            + feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
-                    LOGGER.info("Feed policy requires feed to recover from hardware failure. Attempting to recover feed");
-                }
-            }
-
-            List<FeedFailure> feedFailures = entry.getValue();
-            boolean recoveryPossible = true;
-            for (FeedFailure feedFailure : feedFailures) {
-                switch (feedFailure.failureType) {
-                    case COMPUTE_NODE:
-                    case INGESTION_NODE:
-                        Map<FeedInfo, List<FailureType>> failuresBecauseOfThisNode = failureMap.get(feedFailure.nodeId);
-                        if (failuresBecauseOfThisNode == null) {
-                            failuresBecauseOfThisNode = new HashMap<FeedInfo, List<FailureType>>();
-                            failuresBecauseOfThisNode.put(feedInfo, new ArrayList<FailureType>());
-                            failureMap.put(feedFailure.nodeId, failuresBecauseOfThisNode);
-                        }
-                        List<FailureType> feedF = failuresBecauseOfThisNode.get(feedInfo);
-                        if (feedF == null) {
-                            feedF = new ArrayList<FailureType>();
-                            failuresBecauseOfThisNode.put(feedInfo, feedF);
-                        }
-                        feedF.add(feedFailure.failureType);
-                        break;
-                    case STORAGE_NODE:
-                        recoveryPossible = false;
-                        if (LOGGER.isLoggable(Level.SEVERE)) {
-                            LOGGER.severe("Unrecoverable situation! lost storage node for the feed "
-                                    + feedInfo.feedConnectionId);
-                        }
-                        List<String> requiredNodeIds = dependentFeeds.get(feedInfo);
-                        if (requiredNodeIds == null) {
-                            requiredNodeIds = new ArrayList<String>();
-                            dependentFeeds.put(feedInfo, requiredNodeIds);
-                        }
-                        requiredNodeIds.add(feedFailure.nodeId);
-                        failuresBecauseOfThisNode = failureMap.get(feedFailure.nodeId);
-                        if (failuresBecauseOfThisNode != null) {
-                            failuresBecauseOfThisNode.remove(feedInfo);
-                            if (failuresBecauseOfThisNode.isEmpty()) {
-                                failureMap.remove(feedFailure.nodeId);
-                            }
-                        }
-                        feedsToTerminate.add(feedInfo);
-                        break;
-                }
-            }
-            if (!recoveryPossible) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Terminating irrecoverable feed (loss of storage node) ");
-                }
-            }
-        }
-
-        if (!feedsToTerminate.isEmpty()) {
-            Thread t = new Thread(new FeedsDeActivator(feedsToTerminate));
-            t.start();
-        }
-
-        int numRequiredNodes = 0;
-        for (Entry<String, Map<FeedInfo, List<FeedFailure.FailureType>>> entry : failureMap.entrySet()) {
-            Map<FeedInfo, List<FeedFailure.FailureType>> v = entry.getValue();
-            for (FeedInfo finfo : feedsToTerminate) {
-                v.remove(finfo);
-            }
-            if (v.size() > 0) {
-                numRequiredNodes++;
-            }
-        }
-
-        if (numRequiredNodes > 0) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Number of additional nodes requested " + numRequiredNodes);
-            }
-            AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
-            work.add(addNodesWork);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                Map<FeedInfo, List<FeedFailure>> feedFailures = failureReport.failures;
-                for (Entry<FeedInfo, List<FeedFailure>> entry : feedFailures.entrySet()) {
-                    for (FeedFailure f : entry.getValue()) {
-                        LOGGER.info("Feed Failure! " + f.failureType + " " + f.nodeId);
-                    }
-                }
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Registered work id: " + addNodesWork.getWorkId());
-            }
-            feedWorkRequestResponseHandler.registerFeedWork(addNodesWork.getWorkId(), failureReport);
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Not requesting any new node. Feeds unrecoverable until the lost node(s) rejoin");
-            }
-        }
-        return work;
-    }
-
-    private void reportFeedFailure(FeedFailureReport failureReport) {
-        MetadataTransactionContext ctx = null;
-        FeedActivity fa = null;
-        Map<String, String> feedActivityDetails = new HashMap<String, String>();
-        StringBuilder builder = new StringBuilder();
-        MetadataManager.INSTANCE.acquireWriteLatch();
-        try {
-            ctx = MetadataManager.INSTANCE.beginTransaction();
-            for (Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
-                FeedInfo feedInfo = entry.getKey();
-                List<FeedFailure> feedFailures = entry.getValue();
-                for (FeedFailure failure : feedFailures) {
-                    builder.append(failure + ",");
-                }
-                builder.deleteCharAt(builder.length() - 1);
-                feedActivityDetails.put(FeedActivityDetails.FEED_NODE_FAILURE, builder.toString());
-                fa = new FeedActivity(feedInfo.feedConnectionId.getDataverse(),
-                        feedInfo.feedConnectionId.getFeedName(), feedInfo.feedConnectionId.getDatasetName(),
-                        FeedActivityType.FEED_FAILURE, feedActivityDetails);
-                MetadataManager.INSTANCE.registerFeedActivity(ctx, feedInfo.feedConnectionId, fa);
-            }
-            MetadataManager.INSTANCE.commitTransaction(ctx);
-        } catch (Exception e) {
-            if (ctx != null) {
-                try {
-                    MetadataManager.INSTANCE.abortTransaction(ctx);
-                } catch (Exception e2) {
-                    e2.addSuppressed(e);
-                    throw new IllegalStateException("Unable to abort transaction " + e2);
-                }
-            }
-        } finally {
-            MetadataManager.INSTANCE.releaseWriteLatch();
-        }
-    }
-
-    private static void sendSuperFeedManangerElectMessage(FeedInfo feedInfo, FeedManagerElectMessage electMessage) {
-        try {
-            Dataverse dataverse = new Dataverse(feedInfo.feedConnectionId.getDataverse(),
-                    NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT, 0);
-            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse);
-            JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-
-            IOperatorDescriptor feedMessenger;
-            AlgebricksPartitionConstraint messengerPc;
-            Set<String> locations = new HashSet<String>();
-            locations.addAll(feedInfo.computeLocations);
-            locations.addAll(feedInfo.ingestLocations);
-            locations.addAll(feedInfo.storageLocations);
-
-            Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = metadataProvider.buildSendFeedMessageRuntime(
-                    spec, dataverse.getDataverseName(), feedInfo.feedConnectionId.getFeedName(),
-                    feedInfo.feedConnectionId.getDatasetName(), electMessage, locations.toArray(new String[] {}));
-            feedMessenger = p.first;
-            messengerPc = p.second;
-            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
-
-            NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
-            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
-            spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
-            spec.addRoot(nullSink);
-
-            JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(spec);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(" Super Feed Manager Message: " + electMessage + " Job Id " + jobId);
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Exception in sending super feed manager elect message: " + feedInfo.feedConnectionId + " "
-                        + e.getMessage());
-            }
-        }
-    }
-
-    public static class FeedFailure {
-
-        public enum FailureType {
-            INGESTION_NODE,
-            COMPUTE_NODE,
-            STORAGE_NODE
-        }
-
-        public FailureType failureType;
-        public String nodeId;
-
-        public FeedFailure(FailureType failureType, String nodeId) {
-            this.failureType = failureType;
-            this.nodeId = nodeId;
-        }
-
-        @Override
-        public String toString() {
-            return failureType + " (" + nodeId + ") ";
-        }
-    }
-
-    @Override
-    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
-        State newState = AsterixClusterProperties.INSTANCE.getState();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(joinedNodeId + " joined the cluster. " + "Asterix state: " + newState);
-        }
-
-        boolean needToReActivateFeeds = !newState.equals(state) && (newState == State.ACTIVE);
-        if (needToReActivateFeeds) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
-            }
-            try {
-                FeedsActivator activator = new FeedsActivator();
-                (new Thread(activator)).start();
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Exception in resuming feeds" + e.getMessage());
-                }
-            }
-            state = newState;
-        } else {
-            List<FeedInfo> feedsThatCanBeRevived = new ArrayList<FeedInfo>();
-            for (Entry<FeedInfo, List<String>> entry : dependentFeeds.entrySet()) {
-                List<String> requiredNodeIds = entry.getValue();
-                if (requiredNodeIds.contains(joinedNodeId)) {
-                    requiredNodeIds.remove(joinedNodeId);
-                    if (requiredNodeIds.isEmpty()) {
-                        feedsThatCanBeRevived.add(entry.getKey());
-                    }
-                }
-            }
-            if (!feedsThatCanBeRevived.isEmpty()) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info(joinedNodeId + " Resuming feeds after rejoining of node " + joinedNodeId);
-                }
-                FeedsActivator activator = new FeedsActivator(feedsThatCanBeRevived);
-                (new Thread(activator)).start();
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
-        try {
-            responseInbox.put(response);
-        } catch (InterruptedException e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Interrupted exception");
-            }
-        }
-    }
-
-    @Override
-    public void notifyStateChange(State previousState, State newState) {
-        switch (newState) {
-            case ACTIVE:
-                if (previousState.equals(State.UNUSABLE)) {
-                    try {
-                        FeedsActivator activator = new FeedsActivator();
-                        (new Thread(activator)).start();
-                    } catch (Exception e) {
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Exception in resuming feeds" + e.getMessage());
-                        }
-                    }
-                }
-                break;
-        }
-
-    }
-
-    private static class FeedsActivator implements Runnable {
-
-        private List<FeedInfo> feedsToRevive;
-        private Mode mode;
-
-        public enum Mode {
-            REVIVAL_POST_CLUSTER_REBOOT,
-            REVIVAL_POST_NODE_REJOIN
-        }
-
-        public FeedsActivator() {
-            this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT;
-        }
-
-        public FeedsActivator(List<FeedInfo> feedsToRevive) {
-            this.feedsToRevive = feedsToRevive;
-            this.mode = Mode.REVIVAL_POST_NODE_REJOIN;
-        }
-
-        @Override
-        public void run() {
-            switch (mode) {
-                case REVIVAL_POST_CLUSTER_REBOOT:
-                    revivePostClusterReboot();
-                    break;
-                case REVIVAL_POST_NODE_REJOIN:
-                    try {
-                        Thread.sleep(10000);
-                    } catch (InterruptedException e1) {
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Attempt to resume feed interrupted");
-                        }
-                        throw new IllegalStateException(e1.getMessage());
-                    }
-                    for (FeedInfo finfo : feedsToRevive) {
-                        try {
-                            JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
-                                LOGGER.info("Job:" + finfo.jobSpec);
-                            }
-                        } catch (Exception e) {
-                            if (LOGGER.isLoggable(Level.WARNING)) {
-                                LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " " + e.getMessage());
-                            }
-                        }
-                    }
-            }
-        }
-
-        private void revivePostClusterReboot() {
-            MetadataTransactionContext ctx = null;
-
-            try {
-
-                Thread.sleep(4000);
-                MetadataManager.INSTANCE.init();
-                ctx = MetadataManager.INSTANCE.beginTransaction();
-                List<FeedActivity> activeFeeds = MetadataManager.INSTANCE.getActiveFeeds(ctx, null, null);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Attempt to resume feeds that were active prior to instance shutdown!");
-                    LOGGER.info("Number of feeds affected:" + activeFeeds.size());
-                    for (FeedActivity fa : activeFeeds) {
-                        LOGGER.info("Active feed " + fa.getDataverseName() + ":" + fa.getDatasetName());
-                    }
-                }
-                for (FeedActivity fa : activeFeeds) {
-                    String feedPolicy = fa.getFeedActivityDetails().get(FeedActivityDetails.FEED_POLICY_NAME);
-                    FeedPolicy policy = MetadataManager.INSTANCE.getFeedPolicy(ctx, fa.getDataverseName(), feedPolicy);
-                    if (policy == null) {
-                        policy = MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME,
-                                feedPolicy);
-                        if (policy == null) {
-                            if (LOGGER.isLoggable(Level.SEVERE)) {
-                                LOGGER.severe("Unable to resume feed: " + fa.getDataverseName() + ":"
-                                        + fa.getDatasetName() + "." + " Unknown policy :" + feedPolicy);
-                            }
-                            continue;
-                        }
-                    }
-
-                    FeedPolicyAccessor fpa = new FeedPolicyAccessor(policy.getProperties());
-                    if (fpa.autoRestartOnClusterReboot()) {
-                        String dataverse = fa.getDataverseName();
-                        String datasetName = fa.getDatasetName();
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Resuming feed after cluster revival: " + dataverse + ":" + datasetName
-                                    + " using policy " + feedPolicy);
-                        }
-                        reviveFeed(dataverse, fa.getFeedName(), datasetName, feedPolicy);
-                    } else {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Feed " + fa.getDataverseName() + ":" + fa.getDatasetName()
-                                    + " governed by policy" + feedPolicy
-                                    + " does not state auto restart after cluster revival");
-                        }
-                    }
-                }
-                MetadataManager.INSTANCE.commitTransaction(ctx);
-
-            } catch (Exception e) {
-                e.printStackTrace();
-                try {
-                    MetadataManager.INSTANCE.abortTransaction(ctx);
-                } catch (Exception e1) {
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Exception in aborting" + e.getMessage());
-                    }
-                    throw new IllegalStateException(e1);
-                }
-            }
-        }
-
-        public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
-            PrintWriter writer = new PrintWriter(System.out, true);
-            SessionConfig conf = new SessionConfig(writer, SessionConfig.OutputFormat.ADM);
-            try {
-                DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
-                ConnectFeedStatement stmt = new ConnectFeedStatement(new Identifier(dataverse),
-                        new Identifier(feedName), new Identifier(dataset), feedPolicy, 0);
-                stmt.setForceConnect(true);
-                List<Statement> statements = new ArrayList<Statement>();
-                statements.add(dataverseDecl);
-                statements.add(stmt);
-                AqlTranslator translator = new AqlTranslator(statements, conf);
-                translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                        AqlTranslator.ResultDelivery.SYNC);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy "
-                            + feedPolicy + " Exception " + e.getMessage());
-                }
-            }
-        }
-    }
-
-    public static class FeedsDeActivator implements Runnable {
-
-        private List<FeedInfo> feedsToTerminate;
-
-        public FeedsDeActivator(List<FeedInfo> feedsToTerminate) {
-            this.feedsToTerminate = feedsToTerminate;
-        }
-
-        @Override
-        public void run() {
-            for (FeedInfo feedInfo : feedsToTerminate) {
-                endFeed(feedInfo);
-            }
-        }
-
-        private void endFeed(FeedInfo feedInfo) {
-            MetadataTransactionContext ctx = null;
-            PrintWriter writer = new PrintWriter(System.out, true);
-            SessionConfig conf = new SessionConfig(writer, SessionConfig.OutputFormat.ADM);
-            try {
-                ctx = MetadataManager.INSTANCE.beginTransaction();
-                DisconnectFeedStatement stmt = new DisconnectFeedStatement(new Identifier(
-                        feedInfo.feedConnectionId.getDataverse()), new Identifier(
-                        feedInfo.feedConnectionId.getFeedName()), new Identifier(
-                        feedInfo.feedConnectionId.getDatasetName()));
-                List<Statement> statements = new ArrayList<Statement>();
-                DataverseDecl dataverseDecl = new DataverseDecl(
-                        new Identifier(feedInfo.feedConnectionId.getDataverse()));
-                statements.add(dataverseDecl);
-                statements.add(stmt);
-                AqlTranslator translator = new AqlTranslator(statements, conf);
-                translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
-                        AqlTranslator.ResultDelivery.SYNC);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("End urecoverable feed: " + feedInfo.feedConnectionId);
-                }
-                MetadataManager.INSTANCE.commitTransaction(ctx);
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Exception in ending loser feed: " + feedInfo.feedConnectionId + " Exception "
-                            + e.getMessage());
-                }
-                e.printStackTrace();
-                try {
-                    MetadataManager.INSTANCE.abortTransaction(ctx);
-                } catch (Exception e2) {
-                    e2.addSuppressed(e);
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.severe("Exception in aborting transaction! System is in inconsistent state");
-                    }
-                }
-
-            }
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
deleted file mode 100644
index fa27e7e..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.hyracks.bootstrap;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailure;
-import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailureReport;
-import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedInfo;
-import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedsDeActivator;
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
-import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
-import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
-import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-import edu.uci.ics.hyracks.api.constraints.Constraint;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
-import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class FeedWorkRequestResponseHandler implements Runnable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedWorkRequestResponseHandler.class.getName());
-
-    private final LinkedBlockingQueue<IClusterManagementWorkResponse> inbox;
-
-    private Map<Integer, FeedFailureReport> feedsWaitingForResponse = new HashMap<Integer, FeedFailureReport>();
-
-    public FeedWorkRequestResponseHandler(LinkedBlockingQueue<IClusterManagementWorkResponse> inbox) {
-        this.inbox = inbox;
-    }
-
-    @Override
-    public void run() {
-        while (true) {
-            IClusterManagementWorkResponse response = null;
-            try {
-                response = inbox.take();
-            } catch (InterruptedException e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Interrupted exception " + e.getMessage());
-                }
-            }
-            IClusterManagementWork submittedWork = response.getWork();
-            switch (submittedWork.getClusterManagementWorkType()) {
-                case ADD_NODE:
-                    AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
-                    switch (resp.getStatus()) {
-                        case FAILURE:
-                            if (LOGGER.isLoggable(Level.WARNING)) {
-                                LOGGER.warning("Request " + resp.getWork() + " not completed");
-                            }
-                            break;
-                        case SUCCESS:
-                            if (LOGGER.isLoggable(Level.WARNING)) {
-                                LOGGER.warning("Request " + resp.getWork() + " completed");
-                            }
-                            break;
-                    }
-
-                    AddNodeWork work = (AddNodeWork) submittedWork;
-                    FeedFailureReport failureReport = feedsWaitingForResponse.remove(work.getWorkId());
-                    Set<FeedInfo> affectedFeeds = failureReport.failures.keySet();
-                    for (FeedInfo feedInfo : affectedFeeds) {
-                        try {
-                            recoverFeed(feedInfo, work, resp, failureReport.failures.get(feedInfo));
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Recovered feed:" + feedInfo);
-                            }
-                        } catch (Exception e) {
-                            if (LOGGER.isLoggable(Level.SEVERE)) {
-                                LOGGER.severe("Unable to recover feed:" + feedInfo);
-                            }
-                        }
-                    }
-                    break;
-                case REMOVE_NODE:
-                    break;
-            }
-        }
-    }
-
-    private void recoverFeed(FeedInfo feedInfo, AddNodeWork work, AddNodeWorkResponse resp,
-            List<FeedFailure> feedFailures) throws Exception {
-        List<String> failedNodeIds = new ArrayList<String>();
-        for (FeedFailure feedFailure : feedFailures) {
-            failedNodeIds.add(feedFailure.nodeId);
-        }
-        List<String> chosenReplacements = new ArrayList<String>();
-        switch (resp.getStatus()) {
-            case FAILURE:
-                for (FeedFailure feedFailure : feedFailures) {
-                    switch (feedFailure.failureType) {
-                        case INGESTION_NODE:
-                            String replacement = getInternalReplacement(feedInfo, feedFailure, failedNodeIds,
-                                    chosenReplacements);
-                            chosenReplacements.add(replacement);
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Existing  node:" + replacement + " chosen to replace "
-                                        + feedFailure.nodeId);
-                            }
-                            alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId, replacement);
-                            break;
-                    }
-                }
-                break;
-            case SUCCESS:
-                List<String> nodesAdded = resp.getNodesAdded();
-                int numNodesAdded = nodesAdded.size();
-                int nodeIndex = 0;
-                for (FeedFailure feedFailure : feedFailures) {
-                    switch (feedFailure.failureType) {
-                        case INGESTION_NODE:
-                            String replacement = null;
-                            if (nodeIndex <= numNodesAdded - 1) {
-                                replacement = nodesAdded.get(nodeIndex);
-                                if (LOGGER.isLoggable(Level.INFO)) {
-                                    LOGGER.info("Newly added node:" + replacement + " chosen to replace "
-                                            + feedFailure.nodeId);
-                                }
-                            } else {
-                                replacement = getInternalReplacement(feedInfo, feedFailure, failedNodeIds,
-                                        chosenReplacements);
-                                if (LOGGER.isLoggable(Level.INFO)) {
-                                    LOGGER.info("Existing node:" + replacement + " chosen to replace "
-                                            + feedFailure.nodeId);
-                                }
-                                chosenReplacements.add(replacement);
-                            }
-                            alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId, replacement);
-                            nodeIndex++;
-                            break;
-                        default: // ingestion nodes and compute nodes (in currrent implementation) coincide.
-                                 // so correcting ingestion node failure also takes care of compute nodes failure. 
-                                 // Storage node failures cannot be recovered from as in current implementation, we 
-                                 // do not have data replication.
-                    }
-                }
-                break;
-        }
-
-        JobSpecification spec = feedInfo.jobSpec;
-        System.out.println("Final recovery Job Spec \n" + spec);
-        Thread.sleep(5000);
-        AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
-    }
-
-    private String getInternalReplacement(FeedInfo feedInfo, FeedFailure feedFailure, List<String> failedNodeIds,
-            List<String> chosenReplacements) {
-        String failedNodeId = feedFailure.nodeId;
-        String replacement = null;;
-        // TODO 1st preference is given to any other participant node that is not involved in the feed.
-        //      2nd preference is given to a compute node.
-        //      3rd preference is given to a storage node
-        Set<String> participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
-        if (participantNodes != null && !participantNodes.isEmpty()) {
-            List<String> pNodesClone = new ArrayList<String>();
-            pNodesClone.addAll(participantNodes);
-            pNodesClone.removeAll(feedInfo.storageLocations);
-            pNodesClone.removeAll(feedInfo.computeLocations);
-            pNodesClone.removeAll(feedInfo.ingestLocations);
-            pNodesClone.removeAll(chosenReplacements);
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                for (String candidateNode : pNodesClone) {
-                    LOGGER.info("Candidate for replacement:" + candidateNode);
-                }
-            }
-            if (!pNodesClone.isEmpty()) {
-                String[] participantNodesArray = pNodesClone.toArray(new String[] {});
-
-                replacement = participantNodesArray[new Random().nextInt(participantNodesArray.length)];
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Participant Node: " + replacement + " chosen as replacement for " + failedNodeId);
-                }
-            }
-        }
-
-        if (replacement == null) {
-            feedInfo.computeLocations.removeAll(failedNodeIds);
-            boolean computeNodeSubstitute = (feedInfo.computeLocations.size() > 1);
-            if (computeNodeSubstitute) {
-                replacement = feedInfo.computeLocations.get(0);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Compute node:" + replacement + " chosen to replace " + failedNodeId);
-                }
-            } else {
-                replacement = feedInfo.storageLocations.get(0);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Storage node:" + replacement + " chosen to replace " + failedNodeId);
-                }
-            }
-        }
-        return replacement;
-    }
-
-    private void alterFeedJobSpec(FeedInfo feedInfo, AddNodeWorkResponse resp, String failedNodeId, String replacement) {
-        if (replacement == null) {
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.severe("Unable to find replacement for failed node :" + failedNodeId);
-                LOGGER.severe("Feed: " + feedInfo.feedConnectionId + " will be terminated");
-            }
-            List<FeedInfo> feedsToTerminate = new ArrayList<FeedInfo>();
-            feedsToTerminate.add(feedInfo);
-            Thread t = new Thread(new FeedsDeActivator(feedsToTerminate));
-            t.start();
-        } else {
-            replaceNode(feedInfo.jobSpec, failedNodeId, replacement);
-        }
-    }
-
-    private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
-        Set<Constraint> userConstraints = jobSpec.getUserConstraints();
-        List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
-        List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
-        List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
-        Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
-        Map<OperatorDescriptorId, Map<Integer, String>> newConstraints = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
-        OperatorDescriptorId opId = null;
-        for (Constraint constraint : userConstraints) {
-            LValueConstraintExpression lexpr = constraint.getLValue();
-            ConstraintExpression cexpr = constraint.getRValue();
-            switch (lexpr.getTag()) {
-                case PARTITION_COUNT:
-                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
-                    if (modifiedOperators.contains(opId)) {
-                        countConstraintsToReplace.add(constraint);
-                    } else {
-                        List<Constraint> clist = candidateConstraints.get(opId);
-                        if (clist == null) {
-                            clist = new ArrayList<Constraint>();
-                            candidateConstraints.put(opId, clist);
-                        }
-                        clist.add(constraint);
-                    }
-                    break;
-                case PARTITION_LOCATION:
-                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
-                    String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
-                    if (oldLocation.equals(failedNodeId)) {
-                        locationConstraintsToReplace.add(constraint);
-                        modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
-                        Map<Integer, String> newLocs = newConstraints.get(opId);
-                        if (newLocs == null) {
-                            newLocs = new HashMap<Integer, String>();
-                            newConstraints.put(opId, newLocs);
-                        }
-                        int partition = ((PartitionLocationExpression) lexpr).getPartition();
-                        newLocs.put(partition, replacementNode);
-                    } else {
-                        if (modifiedOperators.contains(opId)) {
-                            locationConstraintsToReplace.add(constraint);
-                            Map<Integer, String> newLocs = newConstraints.get(opId);
-                            if (newLocs == null) {
-                                newLocs = new HashMap<Integer, String>();
-                                newConstraints.put(opId, newLocs);
-                            }
-                            int partition = ((PartitionLocationExpression) lexpr).getPartition();
-                            newLocs.put(partition, oldLocation);
-                        } else {
-                            List<Constraint> clist = candidateConstraints.get(opId);
-                            if (clist == null) {
-                                clist = new ArrayList<Constraint>();
-                                candidateConstraints.put(opId, clist);
-                            }
-                            clist.add(constraint);
-                        }
-                    }
-                    break;
-            }
-        }
-
-        jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
-        jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
-
-        for (OperatorDescriptorId mopId : modifiedOperators) {
-            List<Constraint> clist = candidateConstraints.get(mopId);
-            if (clist != null && !clist.isEmpty()) {
-                jobSpec.getUserConstraints().removeAll(clist);
-
-                for (Constraint c : clist) {
-                    if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
-                        ConstraintExpression cexpr = c.getRValue();
-                        int partition = ((PartitionLocationExpression) c.getLValue()).getPartition();
-                        String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
-                        newConstraints.get(mopId).put(partition, oldLocation);
-                    }
-                }
-            }
-        }
-
-        for (Entry<OperatorDescriptorId, Map<Integer, String>> entry : newConstraints.entrySet()) {
-            OperatorDescriptorId nopId = entry.getKey();
-            Map<Integer, String> clist = entry.getValue();
-            IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
-            String[] locations = new String[clist.size()];
-            for (int i = 0; i < locations.length; i++) {
-                locations[i] = clist.get(i);
-            }
-            PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations);
-        }
-
-    }
-
-    public void registerFeedWork(int workId, FeedFailureReport failureReport) {
-        feedsWaitingForResponse.put(workId, failureReport);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/asterix-app/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 4ee37ba..8eb186e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/asterix-app/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -19,7 +19,6 @@ import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class ConstantTupleSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index d29f881..3e5b734 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -114,7 +114,7 @@ public class ExecutionTest {
     @Parameters
     public static Collection<Object[]> tests() throws Exception {
         Collection<Object[]> testArgs = buildTestsInXml(TestCaseContext.ONLY_TESTSUITE_XML_NAME);
-        if (testArgs.size() == 0){
+        if (testArgs.size() == 0) {
             testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
         }
         return testArgs;
@@ -138,6 +138,6 @@ public class ExecutionTest {
 
     @Test
     public void test() throws Exception {
-        TestsUtils.executeTest(PATH_ACTUAL, tcCtx, null, false);
+            TestsUtils.executeTest(PATH_ACTUAL, tcCtx, null, false);
     }
 }