You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:14 UTC

[20/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeId.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeId.java
deleted file mode 100644
index bf3c2c1..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeId.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.Serializable;
-
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-
-public class FeedRuntimeId implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    public static final String DEFAULT_OPERAND_ID = "N/A";
-
-    private final FeedRuntimeType runtimeType;
-    private final int partition;
-    private final String operandId;
-
-    public FeedRuntimeId(FeedRuntimeType runtimeType, int partition, String operandId) {
-        this.runtimeType = runtimeType;
-        this.partition = partition;
-        this.operandId = operandId;
-    }
-
-    @Override
-    public String toString() {
-        return runtimeType + "[" + partition + "]" + "{" + operandId + "}";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof FeedRuntimeId)) {
-            return false;
-        }
-        FeedRuntimeId other = (FeedRuntimeId) o;
-        return (other.getFeedRuntimeType().equals(runtimeType) && other.getOperandId().equals(operandId) && other
-                .getPartition() == partition);
-    }
-
-    @Override
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
-    public FeedRuntimeType getFeedRuntimeType() {
-        return runtimeType;
-    }
-
-    public int getPartition() {
-        return partition;
-    }
-
-    public FeedRuntimeType getRuntimeType() {
-        return runtimeType;
-    }
-
-    public String getOperandId() {
-        return operandId;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
deleted file mode 100644
index 6642df1..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.DataBucket.ContentType;
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent;
-import org.apache.asterix.common.feeds.api.IFeedMessage;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.message.FeedCongestionMessage;
-import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-/**
- * Provides for error-handling and input-side buffering for a feed runtime.
- */
-public class FeedRuntimeInputHandler implements IFrameWriter {
-
-    private static Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
-
-    private final FeedConnectionId connectionId;
-    private final FeedRuntimeId runtimeId;
-    private final FeedPolicyAccessor feedPolicyAccessor;
-    private boolean bufferingEnabled;
-    private final IExceptionHandler exceptionHandler;
-    private final FeedFrameDiscarder discarder;
-    private final FeedFrameSpiller spiller;
-    private final FeedPolicyAccessor fpa;
-    private final IFeedManager feedManager;
-
-    private IFrameWriter coreOperator;
-    private MonitoredBuffer mBuffer;
-    private DataBucketPool pool;
-    private FrameCollection frameCollection;
-    private Mode mode;
-    private Mode lastMode;
-    private boolean finished;
-    private long nProcessed;
-    private boolean throttlingEnabled;
-
-    private FrameEventCallback frameEventCallback;
-
-    public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled, FrameTupleAccessor fta,
-            RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions) throws IOException {
-        this.connectionId = connectionId;
-        this.runtimeId = runtimeId;
-        this.coreOperator = coreOperator;
-        this.bufferingEnabled = bufferingEnabled;
-        this.feedPolicyAccessor = fpa;
-        this.spiller = new FeedFrameSpiller(ctx, connectionId, runtimeId, fpa);
-        this.discarder = new FeedFrameDiscarder(connectionId, runtimeId, fpa, this);
-        this.exceptionHandler = new FeedExceptionHandler(ctx, fta, recordDesc, feedManager, connectionId);
-        this.mode = Mode.PROCESS;
-        this.lastMode = Mode.PROCESS;
-        this.finished = false;
-        this.fpa = fpa;
-        this.feedManager = feedManager;
-        this.pool = (DataBucketPool) feedManager.getFeedMemoryManager()
-                .getMemoryComponent(IFeedMemoryComponent.Type.POOL);
-        this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
-                .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
-        this.frameEventCallback = new FrameEventCallback(fpa, this, coreOperator);
-        this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
-                feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
-                nPartitions, fpa);
-        this.mBuffer.start();
-        this.throttlingEnabled = false;
-    }
-
-    @Override
-    public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
-        try {
-            switch (mode) {
-                case PROCESS:
-                    switch (lastMode) {
-                        case SPILL:
-                        case POST_SPILL_DISCARD:
-                            setMode(Mode.PROCESS_SPILL);
-                            processSpilledBacklog();
-                            break;
-                        case STALL:
-                            setMode(Mode.PROCESS_BACKLOG);
-                            processBufferredBacklog();
-                            break;
-                        default:
-                            break;
-                    }
-                    process(frame);
-                    break;
-                case PROCESS_BACKLOG:
-                case PROCESS_SPILL:
-                    process(frame);
-                    break;
-                case SPILL:
-                    spill(frame);
-                    break;
-                case DISCARD:
-                case POST_SPILL_DISCARD:
-                    discard(frame);
-                    break;
-                case STALL:
-                    switch (runtimeId.getFeedRuntimeType()) {
-                        case COLLECT:
-                        case COMPUTE_COLLECT:
-                        case COMPUTE:
-                        case STORE:
-                            bufferDataUntilRecovery(frame);
-                            break;
-                        default:
-                            if (LOGGER.isLoggable(Level.WARNING)) {
-                                LOGGER.warning("Discarding frame during " + mode + " mode " + this.runtimeId);
-                            }
-                            break;
-                    }
-                    break;
-                case END:
-                case FAIL:
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Ignoring incoming tuples in " + mode + " mode");
-                    }
-                    break;
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private void bufferDataUntilRecovery(ByteBuffer frame) throws Exception {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Bufferring data until recovery is complete " + this.runtimeId);
-        }
-        if (frameCollection == null) {
-            this.frameCollection = (FrameCollection) feedManager.getFeedMemoryManager()
-                    .getMemoryComponent(IFeedMemoryComponent.Type.COLLECTION);
-        }
-        if (frameCollection == null) {
-            discarder.processMessage(frame);
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Running low on memory! DISCARDING FRAME ");
-            }
-        } else {
-            boolean success = frameCollection.addFrame(frame);
-            if (!success) {
-                if (fpa.spillToDiskOnCongestion()) {
-                    if (frame != null) {
-                        spiller.processMessage(frame);
-                    } // TODO handle the else case
-                } else {
-                    discarder.processMessage(frame);
-                }
-            }
-        }
-    }
-
-    public void reportUnresolvableCongestion() throws HyracksDataException {
-        if (this.runtimeId.getFeedRuntimeType().equals(FeedRuntimeType.COMPUTE)) {
-            FeedCongestionMessage congestionReport = new FeedCongestionMessage(connectionId, runtimeId,
-                    mBuffer.getInflowRate(), mBuffer.getOutflowRate(), mode);
-            feedManager.getFeedMessageService().sendMessage(congestionReport);
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Congestion reported " + this.connectionId + " " + this.runtimeId);
-            }
-        } else {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unresolvable congestion at " + this.connectionId + " " + this.runtimeId);
-            }
-        }
-    }
-
-    private void processBufferredBacklog() throws HyracksDataException {
-        try {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Processing backlog " + this.runtimeId);
-            }
-
-            if (frameCollection != null) {
-                Iterator<ByteBuffer> backlog = frameCollection.getFrameCollectionIterator();
-                while (backlog.hasNext()) {
-                    process(backlog.next());
-                    nProcessed++;
-                }
-                DataBucket bucket = pool.getDataBucket();
-                bucket.setContentType(ContentType.EOSD);
-                bucket.setDesiredReadCount(1);
-                mBuffer.sendMessage(bucket);
-                feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
-                frameCollection = null;
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private void processSpilledBacklog() throws HyracksDataException {
-        try {
-            Iterator<ByteBuffer> backlog = spiller.replayData();
-            while (backlog.hasNext()) {
-                process(backlog.next());
-                nProcessed++;
-            }
-            DataBucket bucket = pool.getDataBucket();
-            bucket.setContentType(ContentType.EOSD);
-            bucket.setDesiredReadCount(1);
-            mBuffer.sendMessage(bucket);
-            spiller.reset();
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    protected void process(ByteBuffer frame) throws HyracksDataException {
-        boolean frameProcessed = false;
-        while (!frameProcessed) {
-            try {
-                if (!bufferingEnabled) {
-                    coreOperator.nextFrame(frame); // synchronous
-                    mBuffer.sendReport(frame);
-                } else {
-                    DataBucket bucket = pool.getDataBucket();
-                    if (bucket != null) {
-                        if (frame != null) {
-                            bucket.reset(frame); // created a copy here
-                            bucket.setContentType(ContentType.DATA);
-                        } else {
-                            bucket.setContentType(ContentType.EOD);
-                        }
-                        bucket.setDesiredReadCount(1);
-                        mBuffer.sendMessage(bucket);
-                        mBuffer.sendReport(frame);
-                        nProcessed++;
-                    } else {
-                        if (fpa.spillToDiskOnCongestion()) {
-                            if (frame != null) {
-                                boolean spilled = spiller.processMessage(frame);
-                                if (spilled) {
-                                    setMode(Mode.SPILL);
-                                } else {
-                                    reportUnresolvableCongestion();
-                                }
-                            }
-                        } else if (fpa.discardOnCongestion()) {
-                            boolean discarded = discarder.processMessage(frame);
-                            if (!discarded) {
-                                reportUnresolvableCongestion();
-                            }
-                        } else if (fpa.throttlingEnabled()) {
-                            setThrottlingEnabled(true);
-                        } else {
-                            reportUnresolvableCongestion();
-                        }
-
-                    }
-                }
-                frameProcessed = true;
-            } catch (Exception e) {
-                if (feedPolicyAccessor.continueOnSoftFailure()) {
-                    frame = exceptionHandler.handleException(e, frame);
-                    if (frame == null) {
-                        frameProcessed = true;
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Encountered exception! " + e.getMessage()
-                                    + "Insufficient information, Cannot extract failing tuple");
-                        }
-                    }
-                } else {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Ingestion policy does not require recovering from tuple. Feed would terminate");
-                    }
-                    mBuffer.close(false);
-                    throw new HyracksDataException(e);
-                }
-            }
-        }
-    }
-
-    private void spill(ByteBuffer frame) throws Exception {
-        boolean success = spiller.processMessage(frame);
-        if (!success) {
-            // limit reached
-            setMode(Mode.POST_SPILL_DISCARD);
-            reportUnresolvableCongestion();
-        }
-    }
-
-    private void discard(ByteBuffer frame) throws Exception {
-        boolean success = discarder.processMessage(frame);
-        if (!success) { // limit reached
-            reportUnresolvableCongestion();
-        }
-    }
-
-    public Mode getMode() {
-        return mode;
-    }
-
-    public synchronized void setMode(Mode mode) {
-        if (mode.equals(this.mode)) {
-            return;
-        }
-        this.lastMode = this.mode;
-        this.mode = mode;
-        if (mode.equals(Mode.END)) {
-            this.close();
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Switched from " + lastMode + " to " + mode + " " + this.runtimeId);
-        }
-    }
-
-    @Override
-    public void close() {
-        if (mBuffer != null) {
-            boolean disableMonitoring = !this.mode.equals(Mode.STALL);
-            if (frameCollection != null) {
-                feedManager.getFeedMemoryManager().releaseMemoryComponent(frameCollection);
-            }
-            if (pool != null) {
-                feedManager.getFeedMemoryManager().releaseMemoryComponent(pool);
-            }
-            mBuffer.close(false, disableMonitoring);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Closed input side handler for " + this.runtimeId + " disabled monitoring "
-                        + disableMonitoring + " Mode for runtime " + this.mode);
-            }
-        }
-    }
-
-    public IFrameWriter getCoreOperator() {
-        return coreOperator;
-    }
-
-    public void setCoreOperator(IFrameWriter coreOperator) {
-        this.coreOperator = coreOperator;
-        mBuffer.setFrameWriter(coreOperator);
-        frameEventCallback.setCoreOperator(coreOperator);
-    }
-
-    public boolean isFinished() {
-        return finished;
-    }
-
-    public void setFinished(boolean finished) {
-        this.finished = finished;
-    }
-
-    public long getProcessed() {
-        return nProcessed;
-    }
-
-    public FeedRuntimeId getRuntimeId() {
-        return runtimeId;
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        coreOperator.open();
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        coreOperator.fail();
-    }
-
-    public void reset(int nPartitions) {
-        this.mBuffer.setNumberOfPartitions(nPartitions);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Reset number of partitions to " + nPartitions + " for " + this.runtimeId);
-        }
-        if (mBuffer != null) {
-            mBuffer.reset();
-        }
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public IFeedManager getFeedManager() {
-        return feedManager;
-    }
-
-    public MonitoredBuffer getmBuffer() {
-        return mBuffer;
-    }
-
-    public boolean isThrottlingEnabled() {
-        return throttlingEnabled;
-    }
-
-    public void setThrottlingEnabled(boolean throttlingEnabled) {
-        if (this.throttlingEnabled != throttlingEnabled) {
-            this.throttlingEnabled = throttlingEnabled;
-            IFeedMessage throttlingEnabledMesg = new ThrottlingEnabledFeedMessage(connectionId, runtimeId);
-            feedManager.getFeedMessageService().sendMessage(throttlingEnabledMesg);
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Throttling " + throttlingEnabled + " for " + this.connectionId + "[" + runtimeId + "]");
-            }
-        }
-    }
-
-    public boolean isBufferingEnabled() {
-        return bufferingEnabled;
-    }
-
-    public void setBufferingEnabled(boolean bufferingEnabled) {
-        this.bufferingEnabled = bufferingEnabled;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeManager.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeManager.java
deleted file mode 100644
index abd5daa..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeManager.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedConnectionManager;
-
-public class FeedRuntimeManager {
-
-    private static Logger LOGGER = Logger.getLogger(FeedRuntimeManager.class.getName());
-
-    private final FeedConnectionId connectionId;
-    private final IFeedConnectionManager connectionManager;
-    private final Map<FeedRuntimeId, FeedRuntime> feedRuntimes;
-
-    private final ExecutorService executorService;
-
-    public FeedRuntimeManager(FeedConnectionId connectionId, IFeedConnectionManager feedConnectionManager) {
-        this.connectionId = connectionId;
-        this.feedRuntimes = new ConcurrentHashMap<FeedRuntimeId, FeedRuntime>();
-        this.executorService = Executors.newCachedThreadPool();
-        this.connectionManager = feedConnectionManager;
-    }
-
-    public void close() throws IOException {
-        if (executorService != null) {
-            executorService.shutdownNow();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Shut down executor service for :" + connectionId);
-            }
-        }
-    }
-
-    public FeedRuntime getFeedRuntime(FeedRuntimeId runtimeId) {
-        return feedRuntimes.get(runtimeId);
-    }
-
-    public void registerFeedRuntime(FeedRuntimeId runtimeId, FeedRuntime feedRuntime) {
-        feedRuntimes.put(runtimeId, feedRuntime);
-    }
-
-    public synchronized void deregisterFeedRuntime(FeedRuntimeId runtimeId) {
-        feedRuntimes.remove(runtimeId);
-        if (feedRuntimes.isEmpty()) {
-            connectionManager.deregisterFeed(connectionId);
-        }
-    }
-
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public Set<FeedRuntimeId> getFeedRuntimes() {
-        return feedRuntimes.keySet();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeReport.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeReport.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeReport.java
deleted file mode 100644
index d7717ac..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeReport.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-public class FeedRuntimeReport {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitAckMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitAckMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitAckMessage.java
deleted file mode 100644
index ada6566..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitAckMessage.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import javax.xml.bind.DatatypeConverter;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.message.FeedMessage;
-
-public class FeedTupleCommitAckMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-    private int intakePartition;
-    private int base;
-    private byte[] commitAcks;
-
-    public FeedTupleCommitAckMessage(FeedConnectionId connectionId, int intakePartition, int base, byte[] commitAcks) {
-        super(MessageType.COMMIT_ACK);
-        this.connectionId = connectionId;
-        this.intakePartition = intakePartition;
-        this.base = base;
-        this.commitAcks = commitAcks;
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
-        obj.put(FeedConstants.MessageConstants.BASE, base);
-        String commitAcksString = DatatypeConverter.printBase64Binary(commitAcks);
-        obj.put(FeedConstants.MessageConstants.COMMIT_ACKS, commitAcksString);
-        return obj;
-    }
-
-    public static FeedTupleCommitAckMessage read(JSONObject obj) throws JSONException {
-        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
-                obj.getString(FeedConstants.MessageConstants.FEED));
-        FeedConnectionId connectionId = new FeedConnectionId(feedId,
-                obj.getString(FeedConstants.MessageConstants.DATASET));
-        int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
-        int base = obj.getInt(FeedConstants.MessageConstants.BASE);
-        String commitAcksString = obj.getString(FeedConstants.MessageConstants.COMMIT_ACKS);
-        byte[] commitAcks = DatatypeConverter.parseBase64Binary(commitAcksString);
-        return new FeedTupleCommitAckMessage(connectionId, intakePartition, base, commitAcks);
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public int getIntakePartition() {
-        return intakePartition;
-    }
-
-    public byte[] getCommitAcks() {
-        return commitAcks;
-    }
-
-    public void reset(int intakePartition, int base, byte[] commitAcks) {
-        this.intakePartition = intakePartition;
-        this.base = base;
-        this.commitAcks = commitAcks;
-    }
-
-    public int getBase() {
-        return base;
-    }
-
-    public void setBase(int base) {
-        this.base = base;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitResponseMessage.java
deleted file mode 100644
index cc32034..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedTupleCommitResponseMessage.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.message.FeedMessage;
-
-public class FeedTupleCommitResponseMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-    private final int intakePartition;
-    private final int maxWindowAcked;
-
-    public FeedTupleCommitResponseMessage(FeedConnectionId connectionId, int intakePartition, int maxWindowAcked) {
-        super(MessageType.COMMIT_ACK_RESPONSE);
-        this.connectionId = connectionId;
-        this.intakePartition = intakePartition;
-        this.maxWindowAcked = maxWindowAcked;
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
-        obj.put(FeedConstants.MessageConstants.MAX_WINDOW_ACKED, maxWindowAcked);
-        return obj;
-    }
-
-    @Override
-    public String toString() {
-        return connectionId + "[" + intakePartition + "]" + "(" + maxWindowAcked + ")";
-    }
-
-    public static FeedTupleCommitResponseMessage read(JSONObject obj) throws JSONException {
-        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
-                obj.getString(FeedConstants.MessageConstants.FEED));
-        FeedConnectionId connectionId = new FeedConnectionId(feedId,
-                obj.getString(FeedConstants.MessageConstants.DATASET));
-        int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
-        int maxWindowAcked = obj.getInt(FeedConstants.MessageConstants.MAX_WINDOW_ACKED);
-        return new FeedTupleCommitResponseMessage(connectionId, intakePartition, maxWindowAcked);
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public int getMaxWindowAcked() {
-        return maxWindowAcked;
-    }
-
-    public int getIntakePartition() {
-        return intakePartition;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameCollection.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameCollection.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameCollection.java
deleted file mode 100644
index 9ed547e..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameCollection.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent;
-import org.apache.asterix.common.feeds.api.IFeedMemoryManager;
-
-/**
- * Represents an expandable collection of frames.
- */
-public class FrameCollection implements IFeedMemoryComponent {
-
-    /** A unique identifier for the feed memory component **/
-    private final int componentId;
-
-    /** A collection of frames (each being a ByteBuffer) **/
-    private final List<ByteBuffer> frames = new LinkedList<ByteBuffer>();
-
-    /** The permitted maximum size, the collection may grow to **/
-    private int maxSize;
-
-    /** The {@link IFeedMemoryManager} for the NodeController **/
-    private final IFeedMemoryManager memoryManager;
-
-    public FrameCollection(int componentId, IFeedMemoryManager memoryManager, int maxSize) {
-        this.componentId = componentId;
-        this.maxSize = maxSize;
-        this.memoryManager = memoryManager;
-    }
-
-    public boolean addFrame(ByteBuffer frame) {
-        if (frames.size() == maxSize) {
-            boolean expansionGranted = memoryManager.expandMemoryComponent(this);
-            if (!expansionGranted) {
-                return false;
-            }
-        }
-        ByteBuffer storageBuffer = ByteBuffer.allocate(frame.capacity());
-        storageBuffer.put(frame);
-        frames.add(storageBuffer);
-        storageBuffer.flip();
-        return true;
-    }
-
-    public Iterator<ByteBuffer> getFrameCollectionIterator() {
-        return frames.iterator();
-    }
-
-    @Override
-    public int getTotalAllocation() {
-        return frames.size();
-    }
-
-    @Override
-    public Type getType() {
-        return Type.COLLECTION;
-    }
-
-    @Override
-    public int getComponentId() {
-        return componentId;
-    }
-
-    @Override
-    public void expand(int delta) {
-        maxSize = maxSize + delta;
-    }
-
-    @Override
-    public void reset() {
-        frames.clear();
-        maxSize = IFeedMemoryManager.START_COLLECTION_SIZE;
-    }
-
-    @Override
-    public String toString() {
-        return "FrameCollection" + "[" + componentId + "]" + "(" + frames.size() + "/" + maxSize + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java
deleted file mode 100644
index 9e106fb..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameDistributor.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent.Type;
-import org.apache.asterix.common.feeds.api.IFeedMemoryManager;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FrameDistributor {
-
-    private static final Logger LOGGER = Logger.getLogger(FrameDistributor.class.getName());
-
-    private static final long MEMORY_AVAILABLE_POLL_PERIOD = 1000; // 1 second
-
-    private final FeedId feedId;
-    private final FeedRuntimeType feedRuntimeType;
-    private final int partition;
-    private final IFeedMemoryManager memoryManager;
-    private final boolean enableSynchronousTransfer;
-    /** A map storing the registered frame readers ({@code FeedFrameCollector}. **/
-    private final Map<IFrameWriter, FeedFrameCollector> registeredCollectors;
-    private final FrameTupleAccessor fta;
-
-    private DataBucketPool pool;
-    private DistributionMode distributionMode;
-    private boolean spillToDiskRequired = false;
-
-    public enum DistributionMode {
-        /**
-         * A single feed frame collector is registered for receiving tuples.
-         * Tuple is sent via synchronous call, that is no buffering is involved
-         **/
-        SINGLE,
-
-        /**
-         * Multiple feed frame collectors are concurrently registered for
-         * receiving tuples.
-         **/
-        SHARED,
-
-        /**
-         * Feed tuples are not being processed, irrespective of # of registered
-         * feed frame collectors.
-         **/
-        INACTIVE
-    }
-
-    public FrameDistributor(FeedId feedId, FeedRuntimeType feedRuntimeType, int partition,
-            boolean enableSynchronousTransfer, IFeedMemoryManager memoryManager, FrameTupleAccessor fta)
-                    throws HyracksDataException {
-        this.feedId = feedId;
-        this.feedRuntimeType = feedRuntimeType;
-        this.partition = partition;
-        this.memoryManager = memoryManager;
-        this.enableSynchronousTransfer = enableSynchronousTransfer;
-        this.registeredCollectors = new HashMap<IFrameWriter, FeedFrameCollector>();
-        this.distributionMode = DistributionMode.INACTIVE;
-        this.fta = fta;
-    }
-
-    public void notifyEndOfFeed() {
-        DataBucket bucket = getDataBucket();
-        if (bucket != null) {
-            sendEndOfFeedDataBucket(bucket);
-        } else {
-            while (bucket == null) {
-                try {
-                    Thread.sleep(MEMORY_AVAILABLE_POLL_PERIOD);
-                    bucket = getDataBucket();
-                } catch (InterruptedException e) {
-                    break;
-                }
-            }
-            if (bucket != null) {
-                sendEndOfFeedDataBucket(bucket);
-            }
-        }
-    }
-
-    private void sendEndOfFeedDataBucket(DataBucket bucket) {
-        bucket.setContentType(DataBucket.ContentType.EOD);
-        nextBucket(bucket);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("End of feed data packet sent " + this.feedId);
-        }
-    }
-
-    public synchronized void registerFrameCollector(FeedFrameCollector frameCollector) {
-        DistributionMode currentMode = distributionMode;
-        switch (distributionMode) {
-            case INACTIVE:
-                if (!enableSynchronousTransfer) {
-                    pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
-                    frameCollector.start();
-                }
-                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
-                setMode(DistributionMode.SINGLE);
-                break;
-            case SINGLE:
-                pool = (DataBucketPool) memoryManager.getMemoryComponent(Type.POOL);
-                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
-                for (FeedFrameCollector reader : registeredCollectors.values()) {
-                    reader.start();
-                }
-                setMode(DistributionMode.SHARED);
-                break;
-            case SHARED:
-                frameCollector.start();
-                registeredCollectors.put(frameCollector.getFrameWriter(), frameCollector);
-                break;
-        }
-        evaluateIfSpillIsEnabled();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(
-                    "Switching to " + distributionMode + " mode from " + currentMode + " mode " + " Feed id " + feedId);
-        }
-    }
-
-    public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector) {
-        switch (distributionMode) {
-            case INACTIVE:
-                throw new IllegalStateException(
-                        "Invalid attempt to deregister frame collector in " + distributionMode + " mode.");
-            case SHARED:
-                frameCollector.closeCollector();
-                registeredCollectors.remove(frameCollector.getFrameWriter());
-                int nCollectors = registeredCollectors.size();
-                if (nCollectors == 1) {
-                    FeedFrameCollector loneCollector = registeredCollectors.values().iterator().next();
-                    setMode(DistributionMode.SINGLE);
-                    loneCollector.setState(FeedFrameCollector.State.TRANSITION);
-                    loneCollector.closeCollector();
-                    memoryManager.releaseMemoryComponent(pool);
-                    evaluateIfSpillIsEnabled();
-                } else {
-                    if (!spillToDiskRequired) {
-                        evaluateIfSpillIsEnabled();
-                    }
-                }
-                break;
-            case SINGLE:
-                frameCollector.closeCollector();
-                setMode(DistributionMode.INACTIVE);
-                spillToDiskRequired = false;
-                break;
-
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Deregistered frame reader" + frameCollector + " from feed distributor for " + feedId);
-        }
-    }
-
-    public void evaluateIfSpillIsEnabled() {
-        if (!spillToDiskRequired) {
-            for (FeedFrameCollector collector : registeredCollectors.values()) {
-                spillToDiskRequired = spillToDiskRequired
-                        || collector.getFeedPolicyAccessor().spillToDiskOnCongestion();
-                if (spillToDiskRequired) {
-                    break;
-                }
-            }
-        }
-    }
-
-    public boolean deregisterFrameCollector(IFrameWriter frameWriter) {
-        FeedFrameCollector collector = registeredCollectors.get(frameWriter);
-        if (collector != null) {
-            deregisterFrameCollector(collector);
-            return true;
-        }
-        return false;
-    }
-
-    public synchronized void setMode(DistributionMode mode) {
-        this.distributionMode = mode;
-    }
-
-    public boolean isRegistered(IFrameWriter writer) {
-        return registeredCollectors.get(writer) != null;
-    }
-
-    public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
-        switch (distributionMode) {
-            case INACTIVE:
-                break;
-            case SINGLE:
-                FeedFrameCollector collector = registeredCollectors.values().iterator().next();
-                switch (collector.getState()) {
-                    case HANDOVER:
-                    case ACTIVE:
-                        if (enableSynchronousTransfer) {
-                            collector.nextFrame(frame); // processing is synchronous
-                        } else {
-                            handleDataBucket(frame);
-                        }
-                        break;
-                    case TRANSITION:
-                        handleDataBucket(frame);
-                        break;
-                    case FINISHED:
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Discarding fetched tuples, feed has ended [" + registeredCollectors.get(0)
-                                    + "]" + " Feed Id " + feedId + " frame distributor " + this.getFeedRuntimeType());
-                        }
-                        registeredCollectors.remove(0);
-                        break;
-                }
-                break;
-            case SHARED:
-                handleDataBucket(frame);
-                break;
-        }
-    }
-
-    private void nextBucket(DataBucket bucket) {
-        for (FeedFrameCollector collector : registeredCollectors.values()) {
-            collector.sendMessage(bucket); // asynchronous call
-        }
-    }
-
-    private void handleDataBucket(ByteBuffer frame) throws HyracksDataException {
-        DataBucket bucket = getDataBucket();
-        if (bucket == null) {
-            handleFrameDuringMemoryCongestion(frame);
-        } else {
-            bucket.reset(frame);
-            bucket.setDesiredReadCount(registeredCollectors.size());
-            nextBucket(bucket);
-        }
-    }
-
-    private void handleFrameDuringMemoryCongestion(ByteBuffer frame) throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("Unable to allocate memory, will evaluate the need to spill");
-        }
-        // wait till memory is available
-    }
-
-    private DataBucket getDataBucket() {
-        DataBucket bucket = null;
-        if (pool != null) {
-            bucket = pool.getDataBucket();
-            if (bucket != null) {
-                bucket.setDesiredReadCount(registeredCollectors.size());
-                return bucket;
-            } else {
-                return null;
-            }
-        }
-        return null;
-    }
-
-    public DistributionMode getMode() {
-        return distributionMode;
-    }
-
-    public void close() {
-        switch (distributionMode) {
-            case INACTIVE:
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("FrameDistributor is " + distributionMode);
-                }
-                break;
-            case SINGLE:
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Disconnecting single frame reader in " + distributionMode + " mode " + " for  feedId "
-                            + feedId + " " + this.feedRuntimeType);
-                }
-                setMode(DistributionMode.INACTIVE);
-                if (!enableSynchronousTransfer) {
-                    notifyEndOfFeed(); // send EOD Data Bucket
-                    waitForCollectorsToFinish();
-                }
-                registeredCollectors.values().iterator().next().disconnect();
-                break;
-            case SHARED:
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Signalling End Of Feed; currently operating in " + distributionMode + " mode");
-                }
-                notifyEndOfFeed(); // send EOD Data Bucket
-                waitForCollectorsToFinish();
-                break;
-        }
-    }
-
-    private void waitForCollectorsToFinish() {
-        synchronized (registeredCollectors.values()) {
-            while (!allCollectorsFinished()) {
-                try {
-                    registeredCollectors.values().wait();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    private boolean allCollectorsFinished() {
-        boolean allFinished = true;
-        for (FeedFrameCollector collector : registeredCollectors.values()) {
-            allFinished = allFinished && collector.getState().equals(FeedFrameCollector.State.FINISHED);
-        }
-        return allFinished;
-    }
-
-    public Collection<FeedFrameCollector> getRegisteredCollectors() {
-        return registeredCollectors.values();
-    }
-
-    public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
-        return registeredCollectors;
-    }
-
-    public FeedId getFeedId() {
-        return feedId;
-    }
-
-    public DistributionMode getDistributionMode() {
-        return distributionMode;
-    }
-
-    public FeedRuntimeType getFeedRuntimeType() {
-        return feedRuntimeType;
-    }
-
-    public int getPartition() {
-        return partition;
-    }
-
-    public FrameTupleAccessor getFta() {
-        return fta;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameEventCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameEventCallback.java
deleted file mode 100644
index 5551ce6..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FrameEventCallback.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FrameEventCallback implements IFrameEventCallback {
-
-    private static final Logger LOGGER = Logger.getLogger(FrameEventCallback.class.getName());
-
-    private final FeedPolicyAccessor fpa;
-    private final FeedRuntimeInputHandler inputSideHandler;
-    private IFrameWriter coreOperator;
-
-    public FrameEventCallback(FeedPolicyAccessor fpa, FeedRuntimeInputHandler inputSideHandler,
-            IFrameWriter coreOperator) {
-        this.fpa = fpa;
-        this.inputSideHandler = inputSideHandler;
-        this.coreOperator = coreOperator;
-    }
-
-    @Override
-    public void frameEvent(FrameEvent event) {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Frame Event for " + inputSideHandler.getRuntimeId() + " " + event);
-        }
-        if (!event.equals(FrameEvent.FINISHED_PROCESSING_SPILLAGE)
-                && inputSideHandler.getMode().equals(Mode.PROCESS_SPILL)) {
-            return;
-        }
-        switch (event) {
-            case PENDING_WORK_THRESHOLD_REACHED:
-                if (fpa.spillToDiskOnCongestion()) {
-                    inputSideHandler.setMode(Mode.SPILL);
-                } else if (fpa.discardOnCongestion()) {
-                    inputSideHandler.setMode(Mode.DISCARD);
-                } else if (fpa.throttlingEnabled()) {
-                    inputSideHandler.setThrottlingEnabled(true);
-                } else {
-                    try {
-                        inputSideHandler.reportUnresolvableCongestion();
-                    } catch (HyracksDataException e) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Unable to report congestion!!!");
-                        }
-                    }
-                }
-                break;
-            case FINISHED_PROCESSING:
-                inputSideHandler.setFinished(true);
-                synchronized (coreOperator) {
-                    coreOperator.notifyAll();
-                }
-                break;
-            case PENDING_WORK_DONE:
-                switch (inputSideHandler.getMode()) {
-                    case SPILL:
-                    case DISCARD:
-                    case POST_SPILL_DISCARD:
-                        inputSideHandler.setMode(Mode.PROCESS);
-                        break;
-                    default:
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Received " + event + " ignoring as operating in " + inputSideHandler.getMode());
-                        }
-                }
-                break;
-            case FINISHED_PROCESSING_SPILLAGE:
-                inputSideHandler.setMode(Mode.PROCESS);
-                break;
-            default:
-                break;
-        }
-    }
-
-    public void setCoreOperator(IFrameWriter coreOperator) {
-        this.coreOperator = coreOperator;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IngestionRuntime.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/IngestionRuntime.java
deleted file mode 100644
index 926df39..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IngestionRuntime.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.logging.Level;
-
-import org.apache.asterix.common.feeds.api.IAdapterRuntimeManager;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class IngestionRuntime extends SubscribableRuntime {
-
-    private final IAdapterRuntimeManager adapterRuntimeManager;
-
-    public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
-            RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager) {
-        super(feedId, runtimeId, null, feedWriter, recordDesc);
-        this.adapterRuntimeManager = adaptorRuntimeManager;
-    }
-
-    public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception {
-        FeedFrameCollector reader = dWriter.subscribeFeed(fpa, collectionRuntime.getInputHandler(),
-                collectionRuntime.getConnectionId());
-        collectionRuntime.setFrameCollector(reader);
-        
-        if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
-            adapterRuntimeManager.start();
-        }
-        subscribers.add(collectionRuntime);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this);
-        }
-    }
-
-    public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception {
-        dWriter.unsubscribeFeed(collectionRuntime.getInputHandler());
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Unsubscribed feed collection [" + collectionRuntime + "] from " + this);
-        }
-        if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.INACTIVE)) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Stopping adapter for " + this + " as no more registered collectors");
-            }
-            adapterRuntimeManager.stop();
-        }
-        subscribers.remove(collectionRuntime);
-    }
-
-    public void endOfFeed() {
-        dWriter.notifyEndOfFeed();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Notified End Of Feed  [" + this + "]");
-        }
-    }
-
-    public IAdapterRuntimeManager getAdapterRuntimeManager() {
-        return adapterRuntimeManager;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java
deleted file mode 100644
index 5601f73..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakePartitionStatistics.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.BitSet;
-
-public class IntakePartitionStatistics {
-
-    public static int ACK_WINDOW_SIZE = 1024;
-    private BitSet bitSet;
-
-    public IntakePartitionStatistics(int partition, int base) {
-        this.bitSet = new BitSet(ACK_WINDOW_SIZE);
-    }
-
-    public void ackRecordId(int recordId) {
-        int posIndexWithinBase = recordId % ACK_WINDOW_SIZE;
-        this.bitSet.set(posIndexWithinBase);
-    }
-
-    public byte[] getAckInfo() {
-        return bitSet.toByteArray();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakeSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakeSideMonitoredBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakeSideMonitoredBuffer.java
deleted file mode 100644
index 10b7ddb..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/IntakeSideMonitoredBuffer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-import org.apache.asterix.common.feeds.api.IFrameEventCallback;
-import org.apache.asterix.common.feeds.api.IFramePostProcessor;
-import org.apache.asterix.common.feeds.api.IFramePreprocessor;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class IntakeSideMonitoredBuffer extends MonitoredBuffer {
-
-    public IntakeSideMonitoredBuffer(IHyracksTaskContext ctx, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter,
-            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedMetricCollector metricCollector,
-            FeedConnectionId connectionId, FeedRuntimeId runtimeId, IExceptionHandler exceptionHandler,
-            IFrameEventCallback callback, int nPartitions, FeedPolicyAccessor policyAccessor) {
-        super(ctx, inputHandler, frameWriter, fta,  recordDesc, metricCollector, connectionId, runtimeId,
-                exceptionHandler, callback, nPartitions, policyAccessor);
-    }
-
-    @Override
-    protected boolean monitorProcessingRate() {
-        return false;
-    }
-
-    @Override
-    protected boolean logInflowOutflowRate() {
-        return false;
-    }
-
-    @Override
-    protected IFramePreprocessor getFramePreProcessor() {
-        return null;
-    }
-
-    @Override
-    protected IFramePostProcessor getFramePostProcessor() {
-        return null;
-    }
-
-    @Override
-    protected boolean monitorInputQueueLength() {
-        return false;
-    }
-
-    @Override
-    protected boolean reportOutflowRate() {
-        return false;
-    }
-
-    @Override
-    protected boolean reportInflowRate() {
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageListener.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageListener.java
deleted file mode 100644
index c2753d4..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageListener.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.nio.CharBuffer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class MessageListener {
-
-    private static final Logger LOGGER = Logger.getLogger(MessageListener.class.getName());
-
-    private int port;
-    private final LinkedBlockingQueue<String> outbox;
-
-    private ExecutorService executorService = Executors.newFixedThreadPool(10);
-
-    private MessageListenerServer listenerServer;
-
-    public MessageListener(int port, LinkedBlockingQueue<String> outbox) {
-        this.port = port;
-        this.outbox = outbox;
-    }
-
-    public void stop() {
-        listenerServer.stop();
-        if (!executorService.isShutdown()) {
-            executorService.shutdownNow();
-        }
-    }
-
-    public void start() throws IOException {
-        listenerServer = new MessageListenerServer(port, outbox);
-        executorService.execute(listenerServer);
-    }
-
-    private static class MessageListenerServer implements Runnable {
-
-        private final int port;
-        private final LinkedBlockingQueue<String> outbox;
-        private ServerSocket server;
-
-        private static final char EOL = (char) "\n".getBytes()[0];
-
-        public MessageListenerServer(int port, LinkedBlockingQueue<String> outbox) {
-            this.port = port;
-            this.outbox = outbox;
-        }
-
-        public void stop() {
-            try {
-                server.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        public void run() {
-            Socket client = null;
-            try {
-                server = new ServerSocket(port);
-                client = server.accept();
-                InputStream in = client.getInputStream();
-                CharBuffer buffer = CharBuffer.allocate(5000);
-                char ch;
-                while (true) {
-                    ch = (char) in.read();
-                    if (((int) ch) == -1) {
-                        break;
-                    }
-                    while (ch != EOL) {
-                        buffer.put(ch);
-                        ch = (char) in.read();
-                    }
-                    buffer.flip();
-                    String s = new String(buffer.array());
-                    synchronized (outbox) {
-                        outbox.add(s + "\n");
-                    }
-                    buffer.position(0);
-                    buffer.limit(5000);
-                }
-
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unable to start Message listener" + server);
-                }
-            } finally {
-                if (server != null) {
-                    try {
-                        server.close();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageReceiver.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageReceiver.java
deleted file mode 100644
index 6490c6a..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/MessageReceiver.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IMessageReceiver;
-
-public abstract class MessageReceiver<T> implements IMessageReceiver<T> {
-
-    protected static final Logger LOGGER = Logger.getLogger(MessageReceiver.class.getName());
-
-    protected final LinkedBlockingQueue<T> inbox;
-    protected ExecutorService executor;
-
-    public MessageReceiver() {
-        inbox = new LinkedBlockingQueue<T>();
-    }
-
-    public abstract void processMessage(T message) throws Exception;
-
-    @Override
-    public void start() {
-        executor = Executors.newSingleThreadExecutor();
-        executor.execute(new MessageReceiverRunnable<T>(this));
-    }
-
-    @Override
-    public synchronized void sendMessage(T message) {
-        inbox.add(message);
-    }
-
-    @Override
-    public void close(boolean processPending) {
-        if (executor != null) {
-            executor.shutdown();
-            executor = null;
-            if (processPending) {
-                flushPendingMessages();
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Will discard the pending frames " + inbox.size());
-                }
-            }
-        }
-    }
-
-    private static class MessageReceiverRunnable<T> implements Runnable {
-
-        private final LinkedBlockingQueue<T> inbox;
-        private final MessageReceiver<T> messageReceiver;
-
-        public MessageReceiverRunnable(MessageReceiver<T> messageReceiver) {
-            this.inbox = messageReceiver.inbox;
-            this.messageReceiver = messageReceiver;
-        }
-
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    T message = inbox.take();
-                    messageReceiver.processMessage(message);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    protected void flushPendingMessages() {
-        while (!inbox.isEmpty()) {
-            T message = null;
-            try {
-                message = inbox.take();
-                processMessage(message);
-            } catch (InterruptedException ie) {
-                // ignore exception but break from the loop
-                break;
-            } catch (Exception e) {
-                e.printStackTrace();
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Exception " + e + " in processing message " + message);
-                }
-            }
-        }
-    }
-
-}