You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:31:57 UTC

[03/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
deleted file mode 100644
index 5085087..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.CollectionRuntime;
-import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.feeds.IngestionRuntime;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager.State;
-import org.apache.asterix.common.feeds.api.IDataSourceAdapter;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.common.feeds.api.ISubscriberRuntime;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/**
- * The runtime for @see{FeedIntakeOperationDescriptor}.
- * Provides the core functionality to set up the artifacts for ingestion of a feed.
- * The artifacts are lazily activated when a feed receives a subscription request.
- */
-public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-
-    private static Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
-
-    private final FeedId feedId;
-    private final int partition;
-    private final IFeedSubscriptionManager feedSubscriptionManager;
-    private final IFeedManager feedManager;
-    private final IHyracksTaskContext ctx;
-    private final IAdapterFactory adapterFactory;
-
-    private IngestionRuntime ingestionRuntime;
-    private IDataSourceAdapter adapter;
-    private IIntakeProgressTracker tracker;
-    private DistributeFeedFrameWriter feedFrameWriter;
-
-    public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IAdapterFactory adapterFactory,
-            int partition, IngestionRuntime ingestionRuntime, FeedPolicyAccessor policyAccessor) {
-        this.ctx = ctx;
-        this.feedId = feedId;
-        this.partition = partition;
-        this.ingestionRuntime = ingestionRuntime;
-        this.adapterFactory = adapterFactory;
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        this.feedSubscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
-        this.feedManager = runtimeCtx.getFeedManager();
-    }
-
-    @Override
-    public void initialize() throws HyracksDataException {
-        IAdapterRuntimeManager adapterRuntimeManager = null;
-        try {
-            if (ingestionRuntime == null) {
-                try {
-                    adapter = adapterFactory.createAdapter(ctx, partition);
-                    //TODO: Fix record tracking
-                    //                    if (adapterFactory.isRecordTrackingEnabled()) {
-                    //                        tracker = adapterFactory.createIntakeProgressTracker();
-                    //                    }
-                } catch (Exception e) {
-                    LOGGER.severe("Unable to create adapter : " + adapterFactory.getAlias() + "[" + partition + "]"
-                            + " Exception " + e);
-                    throw new HyracksDataException(e);
-                }
-                FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc);
-                feedFrameWriter = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition,
-                        fta, feedManager);
-                adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, tracker, feedFrameWriter, partition);
-                SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
-                        partition);
-                ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
-                        adapterRuntimeManager);
-                feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
-                feedFrameWriter.open();
-            } else {
-                if (ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
-                    ingestionRuntime.getAdapterRuntimeManager().setState(State.ACTIVE_INGESTION);
-                    adapter = ingestionRuntime.getAdapterRuntimeManager().getFeedAdapter();
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info(" Switching to " + State.ACTIVE_INGESTION + " for ingestion runtime "
-                                + ingestionRuntime);
-                        LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
-                                + " connected to backend for feed " + feedId);
-                    }
-                    feedFrameWriter = ingestionRuntime.getFeedFrameWriter();
-                } else {
-                    String message = "Feed Ingestion Runtime for feed " + feedId
-                            + " is already registered and is active!.";
-                    LOGGER.severe(message);
-                    throw new IllegalStateException(message);
-                }
-            }
-
-            waitTillIngestionIsOver(adapterRuntimeManager);
-            feedSubscriptionManager
-                    .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
-            if (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)) {
-                throw new HyracksDataException("Unable to ingest data");
-            }
-
-        } catch (InterruptedException ie) {
-            /*
-             * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.
-             * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
-             * The surviving intake partitions must continue to live and receive data from the external source.
-             */
-            List<ISubscriberRuntime> subscribers = ingestionRuntime.getSubscribers();
-            FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(new HashMap<String, String>());
-            boolean needToHandleFailure = false;
-            List<ISubscriberRuntime> failingSubscribers = new ArrayList<ISubscriberRuntime>();
-            for (ISubscriberRuntime subscriber : subscribers) {
-                policyAccessor.reset(subscriber.getFeedPolicy());
-                if (!policyAccessor.continueOnHardwareFailure()) {
-                    failingSubscribers.add(subscriber);
-                } else {
-                    needToHandleFailure = true;
-                }
-            }
-
-            for (ISubscriberRuntime failingSubscriber : failingSubscribers) {
-                try {
-                    ingestionRuntime.unsubscribeFeed((CollectionRuntime) failingSubscriber);
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning(
-                                "Excpetion in unsubscribing " + failingSubscriber + " message " + e.getMessage());
-                    }
-                }
-            }
-
-            if (needToHandleFailure) {
-                ingestionRuntime.getAdapterRuntimeManager().setState(State.INACTIVE_INGESTION);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Switching to " + State.INACTIVE_INGESTION + " on occurrence of failure.");
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info(
-                            "Interrupted Exception. None of the subscribers need to handle failures. Shutting down feed ingestion");
-                }
-                feedSubscriptionManager
-                        .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
-                throw new HyracksDataException(ie);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        } finally {
-            if (ingestionRuntime != null
-                    && !ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
-                feedFrameWriter.close();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Closed Frame Writer " + feedFrameWriter + " adapter state "
-                            + ingestionRuntime.getAdapterRuntimeManager().getState());
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Ending intake operator node pushable in state " + State.INACTIVE_INGESTION
-                            + " Will resume after correcting failure");
-                }
-            }
-
-        }
-    }
-
-    private void waitTillIngestionIsOver(IAdapterRuntimeManager adapterRuntimeManager) throws InterruptedException {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Waiting for adaptor [" + partition + "]" + "to be done with ingestion of feed " + feedId);
-        }
-        synchronized (adapterRuntimeManager) {
-            while (!(adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FINISHED_INGESTION)
-                    || (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)))) {
-                adapterRuntimeManager.wait();
-            }
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
-                    + " done with ingestion of feed " + feedId);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
deleted file mode 100644
index 2add90d..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedLifecycleEventSubscriber.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
-
-public class FeedLifecycleEventSubscriber implements IFeedLifecycleEventSubscriber {
-
-    private LinkedBlockingQueue<FeedLifecycleEvent> inbox;
-
-    public FeedLifecycleEventSubscriber() {
-        this.inbox = new LinkedBlockingQueue<FeedLifecycleEvent>();
-    }
-
-    @Override
-    public void handleFeedEvent(FeedLifecycleEvent event) {
-        inbox.add(event);
-    }
-
-    @Override
-    public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException {
-        boolean eventOccurred = false;
-        FeedLifecycleEvent e = null;
-        Iterator<FeedLifecycleEvent> eventsSoFar = inbox.iterator();
-        while (eventsSoFar.hasNext()) {
-            e = eventsSoFar.next();
-            assertNoFailure(e);
-            eventOccurred = e.equals(event);
-        }
-
-        while (!eventOccurred) {
-            e = inbox.take();
-            eventOccurred = e.equals(event);
-            if (!eventOccurred) {
-                assertNoFailure(e);
-            }
-        }
-    }
-
-    private void assertNoFailure(FeedLifecycleEvent e) throws AsterixException {
-        if (e.equals(FeedLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(FeedLifecycleEvent.FEED_COLLECT_FAILURE)) {
-            throw new AsterixException("Failure in feed");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
deleted file mode 100644
index a1e9917..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.api.IFeedMessage;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * Sends a control message to the registered message queue for feed specified by its feedId.
- */
-public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-    private final IFeedMessage feedMessage;
-
-    public FeedMessageOperatorDescriptor(JobSpecification spec, FeedConnectionId connectionId,
-            IFeedMessage feedMessage) {
-        super(spec, 0, 1);
-        this.connectionId = connectionId;
-        this.feedMessage = feedMessage;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition, nPartitions);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
deleted file mode 100644
index 313fa1a..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.CollectionRuntime;
-import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
-import org.apache.asterix.common.feeds.FeedCollectRuntimeInputHandler;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedFrameCollector;
-import org.apache.asterix.common.feeds.FeedFrameCollector.State;
-import org.apache.asterix.common.feeds.FeedId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
-import org.apache.asterix.common.feeds.FeedRuntimeManager;
-import org.apache.asterix.common.feeds.FeedTupleCommitResponseMessage;
-import org.apache.asterix.common.feeds.IngestionRuntime;
-import org.apache.asterix.common.feeds.IntakePartitionStatistics;
-import org.apache.asterix.common.feeds.MonitoredBufferTimerTasks.MonitoredBufferStorageTimerTask;
-import org.apache.asterix.common.feeds.StorageSideMonitoredBuffer;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedMessage;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.asterix.common.feeds.message.EndFeedMessage;
-import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-/**
- * Runtime for the FeedMessageOpertorDescriptor. This operator is responsible for communicating
- * a feed message to the local feed manager on the host node controller.
- *
- * @see FeedMessageOperatorDescriptor
- *      IFeedMessage
- *      IFeedManager
- */
-public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
-
-    private final FeedConnectionId connectionId;
-    private final IFeedMessage message;
-    private final IFeedManager feedManager;
-    private final int partition;
-
-    public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId connectionId,
-            IFeedMessage feedMessage, int partition, int nPartitions) {
-        this.connectionId = connectionId;
-        this.message = feedMessage;
-        this.partition = partition;
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        this.feedManager = runtimeCtx.getFeedManager();
-    }
-
-    @Override
-    public void initialize() throws HyracksDataException {
-        try {
-            writer.open();
-            switch (message.getMessageType()) {
-                case END:
-                    EndFeedMessage endFeedMessage = (EndFeedMessage) message;
-                    switch (endFeedMessage.getEndMessageType()) {
-                        case DISCONNECT_FEED:
-                            hanldeDisconnectFeedTypeMessage(endFeedMessage);
-                            break;
-                        case DISCONTINUE_SOURCE:
-                            handleDiscontinueFeedTypeMessage(endFeedMessage);
-                            break;
-                    }
-                    break;
-                case PREPARE_STALL: {
-                    handlePrepareStallMessage((PrepareStallMessage) message);
-                    break;
-                }
-                case TERMINATE_FLOW: {
-                    FeedConnectionId connectionId = ((TerminateDataFlowMessage) message).getConnectionId();
-                    handleTerminateFlowMessage(connectionId);
-                    break;
-                }
-                case COMMIT_ACK_RESPONSE: {
-                    handleFeedTupleCommitResponseMessage((FeedTupleCommitResponseMessage) message);
-                    break;
-                }
-                case THROTTLING_ENABLED: {
-                    handleThrottlingEnabledMessage((ThrottlingEnabledFeedMessage) message);
-                    break;
-                }
-                default:
-                    break;
-
-            }
-
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        } finally {
-            writer.close();
-        }
-    }
-
-    private void handleThrottlingEnabledMessage(ThrottlingEnabledFeedMessage throttlingMessage) {
-        FeedConnectionId connectionId = throttlingMessage.getConnectionId();
-        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
-        Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
-        for (FeedRuntimeId runtimeId : runtimes) {
-            if (runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.STORE)) {
-                FeedRuntime storeRuntime = runtimeManager.getFeedRuntime(runtimeId);
-                ((StorageSideMonitoredBuffer) (storeRuntime.getInputHandler().getmBuffer())).setAcking(false);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Acking Disabled in view of throttling that has been activted upfron in the pipeline "
-                            + connectionId);
-                }
-            }
-        }
-    }
-
-    private void handleFeedTupleCommitResponseMessage(FeedTupleCommitResponseMessage commitResponseMessage) {
-        FeedConnectionId connectionId = commitResponseMessage.getConnectionId();
-        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
-        Set<FeedRuntimeId> runtimes = runtimeManager.getFeedRuntimes();
-        for (FeedRuntimeId runtimeId : runtimes) {
-            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
-            switch (runtimeId.getFeedRuntimeType()) {
-                case COLLECT:
-                    FeedCollectRuntimeInputHandler inputHandler = (FeedCollectRuntimeInputHandler) runtime
-                            .getInputHandler();
-                    int maxBasePersisted = commitResponseMessage.getMaxWindowAcked();
-                    inputHandler.dropTill(IntakePartitionStatistics.ACK_WINDOW_SIZE * (maxBasePersisted + 1));
-                    break;
-                case STORE:
-                    MonitoredBufferStorageTimerTask sTask = runtime.getInputHandler().getmBuffer()
-                            .getStorageTimeTrackingRateTask();
-                    sTask.receiveCommitAckResponse(commitResponseMessage);
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        commitResponseMessage.getIntakePartition();
-        SubscribableFeedRuntimeId sid = new SubscribableFeedRuntimeId(connectionId.getFeedId(), FeedRuntimeType.INTAKE,
-                partition);
-        IngestionRuntime ingestionRuntime = (IngestionRuntime) feedManager.getFeedSubscriptionManager()
-                .getSubscribableRuntime(sid);
-        if (ingestionRuntime != null) {
-            IIntakeProgressTracker tracker = ingestionRuntime.getAdapterRuntimeManager().getProgressTracker();
-            if (tracker != null) {
-                tracker.notifyIngestedTupleTimestamp(System.currentTimeMillis());
-            }
-        }
-    }
-
-    private void handleTerminateFlowMessage(FeedConnectionId connectionId) throws HyracksDataException {
-        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
-        Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
-
-        boolean found = false;
-        for (FeedRuntimeId runtimeId : feedRuntimes) {
-            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
-            if (runtime.getRuntimeId().getRuntimeType().equals(FeedRuntimeType.COLLECT)) {
-                ((CollectionRuntime) runtime).getFrameCollector().setState(State.HANDOVER);
-                found = true;
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Switched " + runtime + " to Hand Over stage");
-                }
-            }
-        }
-        if (!found) {
-            throw new HyracksDataException("COLLECT Runtime  not found!");
-        }
-    }
-
-    private void handlePrepareStallMessage(PrepareStallMessage prepareStallMessage) throws HyracksDataException {
-        FeedConnectionId connectionId = prepareStallMessage.getConnectionId();
-        int computePartitionsRetainLimit = prepareStallMessage.getComputePartitionsRetainLimit();
-        FeedRuntimeManager runtimeManager = feedManager.getFeedConnectionManager().getFeedRuntimeManager(connectionId);
-        Set<FeedRuntimeId> feedRuntimes = runtimeManager.getFeedRuntimes();
-        for (FeedRuntimeId runtimeId : feedRuntimes) {
-            FeedRuntime runtime = runtimeManager.getFeedRuntime(runtimeId);
-            switch (runtimeId.getFeedRuntimeType()) {
-                case COMPUTE:
-                    Mode requiredMode = runtimeId.getPartition() <= computePartitionsRetainLimit ? Mode.STALL
-                            : Mode.END;
-                    runtime.setMode(requiredMode);
-                    break;
-                default:
-                    runtime.setMode(Mode.STALL);
-                    break;
-            }
-        }
-    }
-
-    private void handleDiscontinueFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
-        FeedId sourceFeedId = endFeedMessage.getSourceFeedId();
-        SubscribableFeedRuntimeId subscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
-                FeedRuntimeType.INTAKE, partition);
-        ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
-                .getSubscribableRuntime(subscribableRuntimeId);
-        IAdapterRuntimeManager adapterRuntimeManager = ((IngestionRuntime) feedRuntime).getAdapterRuntimeManager();
-        adapterRuntimeManager.stop();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Stopped Adapter " + adapterRuntimeManager);
-        }
-    }
-
-    private void hanldeDisconnectFeedTypeMessage(EndFeedMessage endFeedMessage) throws Exception {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Ending feed:" + endFeedMessage.getFeedConnectionId());
-        }
-        FeedRuntimeId runtimeId = null;
-        FeedRuntimeType subscribableRuntimeType = ((EndFeedMessage) message).getSourceRuntimeType();
-        if (endFeedMessage.isCompleteDisconnection()) {
-            // subscribableRuntimeType represents the location at which the feed connection receives data
-            FeedRuntimeType runtimeType = null;
-            switch (subscribableRuntimeType) {
-                case INTAKE:
-                    runtimeType = FeedRuntimeType.COLLECT;
-                    break;
-                case COMPUTE:
-                    runtimeType = FeedRuntimeType.COMPUTE_COLLECT;
-                    break;
-                default:
-                    throw new IllegalStateException("Invalid subscribable runtime type " + subscribableRuntimeType);
-            }
-
-            runtimeId = new FeedRuntimeId(runtimeType, partition, FeedRuntimeId.DEFAULT_OPERAND_ID);
-            CollectionRuntime feedRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager()
-                    .getFeedRuntime(connectionId, runtimeId);
-            feedRuntime.getSourceRuntime().unsubscribeFeed(feedRuntime);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Complete Unsubscription of " + endFeedMessage.getFeedConnectionId());
-            }
-        } else {
-            // subscribaleRuntimeType represents the location for data hand-off in presence of subscribers
-            switch (subscribableRuntimeType) {
-                case INTAKE:
-                    // illegal state as data hand-off from one feed to another does not happen at intake
-                    throw new IllegalStateException("Illegal State, invalid runtime type  " + subscribableRuntimeType);
-                case COMPUTE:
-                    // feed could be primary or secondary, doesn't matter
-                    SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(
-                            connectionId.getFeedId(), FeedRuntimeType.COMPUTE, partition);
-                    ISubscribableRuntime feedRuntime = feedManager.getFeedSubscriptionManager()
-                            .getSubscribableRuntime(feedSubscribableRuntimeId);
-                    DistributeFeedFrameWriter dWriter = feedRuntime.getFeedFrameWriter();
-                    Map<IFrameWriter, FeedFrameCollector> registeredCollectors = dWriter.getRegisteredReaders();
-
-                    IFrameWriter unsubscribingWriter = null;
-                    for (Entry<IFrameWriter, FeedFrameCollector> entry : registeredCollectors.entrySet()) {
-                        IFrameWriter frameWriter = entry.getKey();
-                        FeedRuntimeInputHandler feedFrameWriter = (FeedRuntimeInputHandler) frameWriter;
-                        if (feedFrameWriter.getConnectionId().equals(endFeedMessage.getFeedConnectionId())) {
-                            unsubscribingWriter = feedFrameWriter;
-                            dWriter.unsubscribeFeed(unsubscribingWriter);
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Partial Unsubscription of " + unsubscribingWriter);
-                            }
-                            break;
-                        }
-                    }
-                    break;
-                default:
-                    break;
-            }
-
-        }
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Unsubscribed from feed :" + connectionId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
deleted file mode 100644
index f833019..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.DistributeFeedFrameWriter;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.SubscribableRuntime;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IActivity;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-/*
- * This IFrameWriter doesn't follow the contract
- */
-public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMetaComputeNodePushable.class.getName());
-
-    /** Runtime node pushable corresponding to the core feed operator **/
-    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
-
-    /**
-     * A policy enforcer that ensures dynamic decisions for a feed are taken
-     * in accordance with the associated ingestion policy
-     **/
-    private FeedPolicyEnforcer policyEnforcer;
-
-    /**
-     * The Feed Runtime instance associated with the operator. Feed Runtime
-     * captures the state of the operator while the feed is active.
-     */
-    private FeedRuntime feedRuntime;
-
-    /**
-     * A unique identifier for the feed instance. A feed instance represents
-     * the flow of data from a feed to a dataset.
-     **/
-    private FeedConnectionId connectionId;
-
-    /**
-     * Denotes the i'th operator instance in a setting where K operator
-     * instances are scheduled to run in parallel
-     **/
-    private int partition;
-
-    private int nPartitions;
-
-    /** The (singleton) instance of IFeedManager **/
-    private IFeedManager feedManager;
-
-    private FrameTupleAccessor fta;
-
-    private final IHyracksTaskContext ctx;
-
-    private final FeedRuntimeType runtimeType = FeedRuntimeType.COMPUTE;
-
-    private FeedRuntimeInputHandler inputSideHandler;
-
-    public FeedMetaComputeNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
-            Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
-        this.ctx = ctx;
-        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
-                .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
-        this.partition = partition;
-        this.nPartitions = nPartitions;
-        this.connectionId = feedConnectionId;
-        this.feedManager = ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
-                .getApplicationObject()).getFeedManager();
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        this.feedManager = runtimeCtx.getFeedManager();
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        FeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(connectionId.getFeedId(), runtimeType, partition);
-        try {
-            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
-            if (feedRuntime == null) {
-                initializeNewFeedRuntime(runtimeId);
-            } else {
-                reviveOldFeedRuntime(runtimeId);
-            }
-            writer.open();
-            coreOperator.open();
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        this.fta = new FrameTupleAccessor(recordDesc);
-        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
-                policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager, nPartitions);
-
-        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
-                writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
-        coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
-
-        feedRuntime = new SubscribableRuntime(connectionId.getFeedId(), runtimeId, inputSideHandler, distributeWriter,
-                recordDesc);
-        feedManager.getFeedSubscriptionManager().registerFeedSubscribableRuntime((ISubscribableRuntime) feedRuntime);
-        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, feedRuntime);
-
-        distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
-    }
-
-    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        this.fta = new FrameTupleAccessor(recordDesc);
-        this.inputSideHandler = feedRuntime.getInputHandler();
-        this.inputSideHandler.setCoreOperator(coreOperator);
-
-        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
-                writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
-        coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
-        distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
-
-        inputSideHandler.reset(nPartitions);
-        feedRuntime.setMode(Mode.PROCESS);
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        try {
-            inputSideHandler.nextFrame(buffer);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("Core Op:" + coreOperator.getDisplayName() + " fail ");
-        }
-        feedRuntime.setMode(Mode.FAIL);
-        coreOperator.fail();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
-        boolean end = inputSideHandler.getMode().equals(Mode.END);
-        try {
-            if (inputSideHandler != null) {
-                if (!(stalled || end)) {
-                    inputSideHandler.nextFrame(null); // signal end of data
-                    while (!inputSideHandler.isFinished()) {
-                        synchronized (coreOperator) {
-                            coreOperator.wait();
-                        }
-                    }
-                } else {
-                    inputSideHandler.setFinished(true);
-                }
-            }
-            coreOperator.close();
-            System.out.println("CLOSED " + coreOperator + " STALLED ?" + stalled + " ENDED " + end);
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            if (!stalled) {
-                deregister();
-                System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
-            } else {
-                System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
-            }
-            if (inputSideHandler != null) {
-                inputSideHandler.close();
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
-            }
-        }
-    }
-
-    private void deregister() {
-        if (feedRuntime != null) {
-            // deregister from subscription manager
-            SubscribableFeedRuntimeId runtimeId = (SubscribableFeedRuntimeId) feedRuntime.getRuntimeId();
-            feedManager.getFeedSubscriptionManager().deregisterFeedSubscribableRuntime(runtimeId);
-
-            // deregister from connection manager
-            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java
deleted file mode 100644
index 86f8750..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IActivity;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-public class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMetaNodePushable.class.getName());
-
-    /** Runtime node pushable corresponding to the core feed operator **/
-    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
-
-    /**
-     * A policy enforcer that ensures dyanmic decisions for a feed are taken
-     * in accordance with the associated ingestion policy
-     **/
-    private FeedPolicyEnforcer policyEnforcer;
-
-    /**
-     * The Feed Runtime instance associated with the operator. Feed Runtime
-     * captures the state of the operator while the feed is active.
-     */
-    private FeedRuntime feedRuntime;
-
-    /**
-     * A unique identifier for the feed instance. A feed instance represents
-     * the flow of data from a feed to a dataset.
-     **/
-    private FeedConnectionId connectionId;
-
-    /**
-     * Denotes the i'th operator instance in a setting where K operator
-     * instances are scheduled to run in parallel
-     **/
-    private int partition;
-
-    /** Total number of partitions available **/
-    private int nPartitions;
-
-    /** Type associated with the core feed operator **/
-    private final FeedRuntimeType runtimeType = FeedRuntimeType.OTHER;
-
-    /** The (singleton) instance of IFeedManager **/
-    private IFeedManager feedManager;
-
-    private FrameTupleAccessor fta;
-
-    private final IHyracksTaskContext ctx;
-
-    private final String operandId;
-
-    /** The pre-processor associated with this runtime **/
-    private FeedRuntimeInputHandler inputSideHandler;
-
-    public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
-            int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
-            Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
-        this.ctx = ctx;
-        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
-                .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
-        this.partition = partition;
-        this.nPartitions = nPartitions;
-        this.connectionId = feedConnectionId;
-        this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject()).getFeedManager();
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        this.feedManager = runtimeCtx.getFeedManager();
-        this.operandId = operationId;
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
-        try {
-            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
-            if (feedRuntime == null) {
-                initializeNewFeedRuntime(runtimeId);
-            } else {
-                reviveOldFeedRuntime(runtimeId);
-            }
-            coreOperator.open();
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        this.fta = new FrameTupleAccessor(recordDesc);
-        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId,
-                (AbstractUnaryInputUnaryOutputOperatorNodePushable) coreOperator,
-                policyEnforcer.getFeedPolicyAccessor(), false, fta, recordDesc, feedManager,
-                nPartitions);
-
-        setupBasicRuntime(inputSideHandler);
-    }
-
-    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        this.inputSideHandler = feedRuntime.getInputHandler();
-        this.fta = new FrameTupleAccessor(recordDesc);
-        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
-        feedRuntime.setMode(Mode.PROCESS);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Retreived state from the zombie instance " + runtimeType + " node.");
-        }
-    }
-
-    private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
-        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
-        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
-        feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        try {
-            inputSideHandler.nextFrame(buffer);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
-        }
-        feedRuntime.setMode(Mode.FAIL);
-        coreOperator.fail();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            coreOperator.close();
-        } catch (Exception e) {
-            e.printStackTrace();
-            // ignore
-        } finally {
-            if (inputSideHandler != null) {
-                inputSideHandler.close();
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
-            }
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
deleted file mode 100644
index 5d88a9e..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * FeedMetaOperatorDescriptor is a wrapper operator that provides a sanboox like
- * environment for an hyracks operator that is part of a feed ingestion
- * pipeline. The MetaFeed operator provides an interface iden- tical to that
- * offered by the underlying wrapped operator, hereafter referred to as the core
- * operator. As seen by Hyracks, the altered pipeline is identical to the
- * earlier version formed from core operators. The MetaFeed operator enhances
- * each core operator by providing functionality for handling runtime
- * exceptions, saving any state for future retrieval, and measuring/reporting of
- * performance characteristics. We next describe how the added functionality
- * contributes to providing fault- tolerance.
- */
-
-public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * The actual (Hyracks) operator that is wrapped around by the MetaFeed
-     * operator.
-     **/
-    private IOperatorDescriptor coreOperator;
-
-    /**
-     * A unique identifier for the feed instance. A feed instance represents the
-     * flow of data from a feed to a dataset.
-     **/
-    private final FeedConnectionId feedConnectionId;
-
-    /**
-     * The policy associated with the feed instance.
-     **/
-    private final Map<String, String> feedPolicyProperties;
-
-    /**
-     * type for the feed runtime associated with the operator.
-     * Possible values: COMPUTE, STORE, OTHER
-     **/
-    private final FeedRuntimeType runtimeType;
-
-    private final String operandId;
-
-    public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
-            IOperatorDescriptor coreOperatorDescriptor, Map<String, String> feedPolicyProperties,
-            FeedRuntimeType runtimeType, boolean enableSubscriptionMode, String operandId) {
-        super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
-        this.feedConnectionId = feedConnectionId;
-        this.feedPolicyProperties = feedPolicyProperties;
-        if (coreOperatorDescriptor.getOutputRecordDescriptors().length == 1) {
-            recordDescriptors[0] = coreOperatorDescriptor.getOutputRecordDescriptors()[0];
-        }
-        this.coreOperator = coreOperatorDescriptor;
-        this.runtimeType = runtimeType;
-        this.operandId = operandId;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        IOperatorNodePushable nodePushable = null;
-        switch (runtimeType) {
-            case COMPUTE:
-                nodePushable = new FeedMetaComputeNodePushable(ctx, recordDescProvider, partition, nPartitions,
-                        coreOperator, feedConnectionId, feedPolicyProperties, operandId);
-                break;
-            case STORE:
-                nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions,
-                        coreOperator, feedConnectionId, feedPolicyProperties, operandId);
-                break;
-            case OTHER:
-                nodePushable = new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
-                        feedConnectionId, feedPolicyProperties, operandId);
-                break;
-            case ETS:
-                nodePushable = ((AlgebricksMetaOperatorDescriptor) coreOperator).createPushRuntime(ctx,
-                        recordDescProvider, partition, nPartitions);
-                break;
-            case JOIN:
-                break;
-            default:
-                throw new HyracksDataException(new IllegalArgumentException("Invalid feed runtime: " + runtimeType));
-        }
-        return nodePushable;
-    }
-
-    @Override
-    public String toString() {
-        return "FeedMeta [" + coreOperator + " ]";
-    }
-
-    public IOperatorDescriptor getCoreOperator() {
-        return coreOperator;
-    }
-
-    public FeedRuntimeType getRuntimeType() {
-        return runtimeType;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
deleted file mode 100644
index b409745..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedRuntime;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.FeedRuntimeInputHandler;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IActivity;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMetaStoreNodePushable.class.getName());
-
-    /** Runtime node pushable corresponding to the core feed operator **/
-    private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperator;
-
-    /**
-     * A policy enforcer that ensures dyanmic decisions for a feed are taken
-     * in accordance with the associated ingestion policy
-     **/
-    private FeedPolicyEnforcer policyEnforcer;
-
-    /**
-     * The Feed Runtime instance associated with the operator. Feed Runtime
-     * captures the state of the operator while the feed is active.
-     */
-    private FeedRuntime feedRuntime;
-
-    /**
-     * A unique identifier for the feed instance. A feed instance represents
-     * the flow of data from a feed to a dataset.
-     **/
-    private FeedConnectionId connectionId;
-
-    /**
-     * Denotes the i'th operator instance in a setting where K operator
-     * instances are scheduled to run in parallel
-     **/
-    private int partition;
-
-    private int nPartitions;
-
-    /** Type associated with the core feed operator **/
-    private final FeedRuntimeType runtimeType = FeedRuntimeType.STORE;
-
-    /** The (singleton) instance of IFeedManager **/
-    private IFeedManager feedManager;
-
-    private FrameTupleAccessor fta;
-
-    private final IHyracksTaskContext ctx;
-
-    private final String operandId;
-
-    private FeedRuntimeInputHandler inputSideHandler;
-
-    public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
-            Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
-        this.ctx = ctx;
-        this.coreOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
-                .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-        this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicyProperties);
-        this.partition = partition;
-        this.nPartitions = nPartitions;
-        this.connectionId = feedConnectionId;
-        this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject()).getFeedManager();
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
-        this.feedManager = runtimeCtx.getFeedManager();
-        this.operandId = operationId;
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
-        try {
-            feedRuntime = feedManager.getFeedConnectionManager().getFeedRuntime(connectionId, runtimeId);
-            if (feedRuntime == null) {
-                initializeNewFeedRuntime(runtimeId);
-            } else {
-                reviveOldFeedRuntime(runtimeId);
-            }
-
-            coreOperator.open();
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("Runtime not found for  " + runtimeId + " connection id " + connectionId);
-        }
-        this.fta = new FrameTupleAccessor(recordDesc);
-        this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
-                policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager,
-                nPartitions);
-        if(coreOperator instanceof AsterixLSMInsertDeleteOperatorNodePushable){
-            AsterixLSMInsertDeleteOperatorNodePushable indexOp = (AsterixLSMInsertDeleteOperatorNodePushable) coreOperator;
-            if(!indexOp.isPrimary()){
-                inputSideHandler.setBufferingEnabled(false);
-            }
-        }
-        setupBasicRuntime(inputSideHandler);
-    }
-
-    private void reviveOldFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
-        this.inputSideHandler = feedRuntime.getInputHandler();
-        this.fta = new FrameTupleAccessor(recordDesc);
-        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
-        this.inputSideHandler.reset(nPartitions);
-        this.inputSideHandler.setCoreOperator(coreOperator);
-        feedRuntime.setMode(Mode.PROCESS);
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("Retreived state from the zombie instance from previous execution for " + runtimeType
-                    + " node.");
-        }
-    }
-
-    private void setupBasicRuntime(FeedRuntimeInputHandler inputHandler) throws Exception {
-        coreOperator.setOutputFrameWriter(0, writer, recordDesc);
-        FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, partition, operandId);
-        feedRuntime = new FeedRuntime(runtimeId, inputHandler, writer);
-        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, (FeedRuntime) feedRuntime);
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        try {
-            inputSideHandler.nextFrame(buffer);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.info("Core Op:" + coreOperator.getDisplayName() + " fail ");
-        }
-        feedRuntime.setMode(Mode.FAIL);
-        coreOperator.fail();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        System.out.println("CLOSE CALLED FOR " + this.feedRuntime.getRuntimeId());
-        boolean stalled = inputSideHandler.getMode().equals(Mode.STALL);
-        try {
-            if (!stalled) {
-                System.out.println("SIGNALLING END OF DATA for " + this.feedRuntime.getRuntimeId() + " mode is "
-                        + inputSideHandler.getMode() + " WAITING ON " + coreOperator);
-                inputSideHandler.nextFrame(null); // signal end of data
-                while (!inputSideHandler.isFinished()) {
-                    synchronized (coreOperator) {
-                        coreOperator.wait();
-                    }
-                }
-                System.out.println("ABOUT TO CLOSE OPERATOR  " + coreOperator);
-            }
-            coreOperator.close();
-        } catch (Exception e) {
-            e.printStackTrace();
-            // ignore
-        } finally {
-            if (!stalled) {
-                deregister();
-                System.out.println("DEREGISTERING " + this.feedRuntime.getRuntimeId());
-            } else {
-                System.out.println("NOT DEREGISTERING " + this.feedRuntime.getRuntimeId());
-            }
-            inputSideHandler.close();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Ending Operator  " + this.feedRuntime.getRuntimeId());
-            }
-        }
-    }
-
-    private void deregister() {
-        if (feedRuntime != null) {
-            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
-                    ((FeedRuntime) feedRuntime).getRuntimeId());
-        }
-    }
-
-}
\ No newline at end of file