You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:15 UTC

[21/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java
deleted file mode 100644
index f1728ce..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedExceptionHandler.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.DataInputStream;
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.FrameDataException;
-import org.apache.asterix.common.feeds.api.IExceptionHandler;
-import org.apache.asterix.common.feeds.api.IFeedManager;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-
-public class FeedExceptionHandler implements IExceptionHandler {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedExceptionHandler.class.getName());
-
-    private final IHyracksTaskContext ctx;
-    private final FrameTupleAccessor fta;
-    private final RecordDescriptor recordDesc;
-    private final IFeedManager feedManager;
-    private final FeedConnectionId connectionId;
-
-    public FeedExceptionHandler(IHyracksTaskContext ctx, FrameTupleAccessor fta, RecordDescriptor recordDesc,
-            IFeedManager feedManager, FeedConnectionId connectionId) {
-        this.ctx = ctx;
-        this.fta = fta;
-        this.recordDesc = recordDesc;
-        this.feedManager = feedManager;
-        this.connectionId = connectionId;
-    }
-
-    public ByteBuffer handleException(Exception e, ByteBuffer frame) {
-        try {
-            if (e instanceof FrameDataException) {
-                fta.reset(frame);
-                FrameDataException fde = (FrameDataException) e;
-                int tupleIndex = fde.getTupleIndex();
-
-                // logging 
-                try {
-                    logExceptionCausingTuple(tupleIndex, e);
-                } catch (Exception ex) {
-                    ex.addSuppressed(e);
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Unable to log exception causing tuple due to..." + ex.getMessage());
-                    }
-                }
-                // slicing
-                return FeedFrameUtil.getSlicedFrame(ctx, tupleIndex, fta);
-            } else {
-                return null;
-            }
-        } catch (Exception exception) {
-            exception.printStackTrace();
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to handle exception " + exception.getMessage());
-            }
-            return null;
-        }
-    }
-
-    private void logExceptionCausingTuple(int tupleIndex, Exception e) throws HyracksDataException, AsterixException {
-
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream di = new DataInputStream(bbis);
-
-        int start = fta.getTupleStartOffset(tupleIndex) + fta.getFieldSlotsLength();
-        bbis.setByteBuffer(fta.getBuffer(), start);
-
-        Object[] record = new Object[recordDesc.getFieldCount()];
-
-        for (int i = 0; i < record.length; ++i) {
-            Object instance = recordDesc.getFields()[i].deserialize(di);
-            if (i == 0) {
-                String tuple = String.valueOf(instance);
-                feedManager.getFeedMetadataManager().logTuple(connectionId, tuple, e.getMessage(), feedManager);
-            } else {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning(", " + String.valueOf(instance));
-                }
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCache.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCache.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCache.java
deleted file mode 100644
index 55a7fb8..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCache.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.common.feeds.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * Allows caching of feed frames. This class is used in providing upstream backup.
- * The tuples at the intake layer are held in this cache until these are acked by
- * the storage layer post their persistence. On receiving an ack, appropriate tuples
- * (recordsId < ackedRecordId) are dropped from the cache.
- */
-public class FeedFrameCache extends MessageReceiver<ByteBuffer> {
-
-    /**
-     * Value represents a cache feed frame
-     * Key represents the largest record Id in the frame.
-     * At the intake side, the largest record id corresponds to the last record in the frame
-     **/
-    private final Map<Integer, ByteBuffer> orderedCache;
-    private final FrameTupleAccessor tupleAccessor;
-    private final IFrameWriter frameWriter;
-    private final IHyracksTaskContext ctx;
-
-    public FeedFrameCache(IHyracksTaskContext ctx, FrameTupleAccessor tupleAccessor, IFrameWriter frameWriter) {
-        this.tupleAccessor = tupleAccessor;
-        this.frameWriter = frameWriter;
-        /** A LinkedHashMap ensures entries are retrieved in order of their insertion **/
-        this.orderedCache = new LinkedHashMap<Integer, ByteBuffer>();
-        this.ctx = ctx;
-    }
-
-    @Override
-    public void processMessage(ByteBuffer frame) throws Exception {
-        int lastRecordId = getLastRecordId(frame);
-        ByteBuffer clone = cloneFrame(frame);
-        orderedCache.put(lastRecordId, clone);
-    }
-
-    public void dropTillRecordId(int recordId) {
-        List<Integer> dropRecordIds = new ArrayList<Integer>();
-        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
-            int recId = entry.getKey();
-            if (recId <= recordId) {
-                dropRecordIds.add(recId);
-            } else {
-                break;
-            }
-        }
-        for (Integer r : dropRecordIds) {
-            orderedCache.remove(r);
-        }
-    }
-
-    public void replayRecords(int startingRecordId) throws HyracksDataException {
-        boolean replayPositionReached = false;
-        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
-            // the key increases monotonically
-            int maxRecordIdInFrame = entry.getKey();
-            if (!replayPositionReached) {
-                if (startingRecordId < maxRecordIdInFrame) {
-                    replayFrame(startingRecordId, entry.getValue());
-                    break;
-                } else {
-                    continue;
-                }
-            }
-        }
-    }
-
-    /**
-     * Replay the frame from the tuple (inclusive) with recordId as specified.
-     * 
-     * @param recordId
-     * @param frame
-     * @throws HyracksDataException
-     */
-    private void replayFrame(int recordId, ByteBuffer frame) throws HyracksDataException {
-        tupleAccessor.reset(frame);
-        int nTuples = tupleAccessor.getTupleCount();
-        for (int i = 0; i < nTuples; i++) {
-            int rid = getRecordIdAtTupleIndex(i, frame);
-            if (rid == recordId) {
-                ByteBuffer slicedFrame = splitFrame(i, frame);
-                replayFrame(slicedFrame);
-                break;
-            }
-        }
-    }
-
-    private ByteBuffer splitFrame(int beginTupleIndex, ByteBuffer frame) throws HyracksDataException {
-        IFrame slicedFrame = new VSizeFrame(ctx);
-        FrameTupleAppender appender = new FrameTupleAppender();
-        appender.reset(slicedFrame, true);
-        int totalTuples = tupleAccessor.getTupleCount();
-        for (int ti = beginTupleIndex; ti < totalTuples; ti++) {
-            appender.append(tupleAccessor, ti);
-        }
-        return slicedFrame.getBuffer();
-    }
-
-    /**
-     * Replay the frame
-     * 
-     * @param frame
-     * @throws HyracksDataException
-     */
-    private void replayFrame(ByteBuffer frame) throws HyracksDataException {
-        frameWriter.nextFrame(frame);
-    }
-
-    private int getLastRecordId(ByteBuffer frame) {
-        tupleAccessor.reset(frame);
-        int nTuples = tupleAccessor.getTupleCount();
-        return getRecordIdAtTupleIndex(nTuples - 1, frame);
-    }
-
-    private int getRecordIdAtTupleIndex(int tupleIndex, ByteBuffer frame) {
-        tupleAccessor.reset(frame);
-        int recordStart = tupleAccessor.getTupleStartOffset(tupleIndex) + tupleAccessor.getFieldSlotsLength();
-        int openPartOffset = frame.getInt(recordStart + 6);
-        int numOpenFields = frame.getInt(recordStart + openPartOffset);
-        int recordIdOffset = frame.getInt(recordStart + openPartOffset + 4 + numOpenFields * 8
-                + StatisticsConstants.INTAKE_TUPLEID.length() + 2 + 1);
-        int lastRecordId = frame.getInt(recordStart + recordIdOffset);
-        return lastRecordId;
-    }
-
-    private ByteBuffer cloneFrame(ByteBuffer frame) {
-        ByteBuffer clone = ByteBuffer.allocate(frame.capacity());
-        System.arraycopy(frame.array(), 0, clone.array(), 0, frame.limit());
-        return clone;
-    }
-
-    public void replayAll() throws HyracksDataException {
-        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
-            ByteBuffer frame = entry.getValue();
-            frameWriter.nextFrame(frame);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCollector.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCollector.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCollector.java
deleted file mode 100644
index a8c0e8f..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameCollector.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.api.IMessageReceiver;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FeedFrameCollector extends MessageReceiver<DataBucket> implements IMessageReceiver<DataBucket> {
-
-    private final FeedConnectionId connectionId;
-    private final FrameDistributor frameDistributor;
-    private FeedPolicyAccessor fpa;
-    private IFrameWriter frameWriter;
-    private State state;
-
-    public enum State {
-        ACTIVE,
-        FINISHED,
-        TRANSITION,
-        HANDOVER
-    }
-
-    public FeedFrameCollector(FrameDistributor frameDistributor, FeedPolicyAccessor feedPolicyAccessor,
-            IFrameWriter frameWriter, FeedConnectionId connectionId) {
-        super();
-        this.frameDistributor = frameDistributor;
-        this.fpa = feedPolicyAccessor;
-        this.connectionId = connectionId;
-        this.frameWriter = frameWriter;
-        this.state = State.ACTIVE;
-    }
-
-    @Override
-    public void processMessage(DataBucket bucket) throws Exception {
-        try {
-            ByteBuffer frame = bucket.getContent();
-            switch (bucket.getContentType()) {
-                case DATA:
-                    frameWriter.nextFrame(frame);
-                    break;
-                case EOD:
-                    closeCollector();
-                    break;
-                case EOSD:
-                    throw new AsterixException("Received data bucket with content of type " + bucket.getContentType());
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to process data bucket " + bucket + ", encountered exception " + e.getMessage());
-            }
-        } finally {
-            bucket.doneReading();
-        }
-    }
-
-    public void closeCollector() {
-        if (state.equals(State.TRANSITION)) {
-            super.close(true);
-            setState(State.ACTIVE);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(this + " is now " + State.ACTIVE + " mode, processing frames synchronously");
-            }
-        } else {
-            flushPendingMessages();
-            setState(State.FINISHED);
-            synchronized (frameDistributor.getRegisteredCollectors()) {
-                frameDistributor.getRegisteredCollectors().notifyAll();
-            }
-            disconnect();
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Closed collector " + this);
-        }
-    }
-
-    public synchronized void disconnect() {
-        setState(State.FINISHED);
-    }
-
-    public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
-        frameWriter.nextFrame(frame);
-    }
-
-    public FeedPolicyAccessor getFeedPolicyAccessor() {
-        return fpa;
-    }
-
-    public synchronized State getState() {
-        return state;
-    }
-
-    public synchronized void setState(State state) {
-        this.state = state;
-        switch (state) {
-            case FINISHED:
-            case HANDOVER:
-                notifyAll();
-                break;
-            default:
-                break;
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Frame Collector " + this.frameDistributor.getFeedRuntimeType() + " switched to " + state);
-        }
-    }
-
-    public IFrameWriter getFrameWriter() {
-        return frameWriter;
-    }
-
-    public void setFrameWriter(IFrameWriter frameWriter) {
-        this.frameWriter = frameWriter;
-    }
-
-    @Override
-    public String toString() {
-        return "FrameCollector " + connectionId + "," + state + "]";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o instanceof FeedFrameCollector) {
-            return connectionId.equals(((FeedFrameCollector) o).connectionId);
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return connectionId.toString().hashCode();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java
deleted file mode 100644
index 8609366..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameDiscarder.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class FeedFrameDiscarder {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
-
-    private final FeedRuntimeInputHandler inputHandler;
-    private final FeedConnectionId connectionId;
-    private final FeedRuntimeId runtimeId;
-    private final FeedPolicyAccessor policyAccessor;
-    private final float maxFractionDiscard;
-    private int nDiscarded;
-
-    public FeedFrameDiscarder(FeedConnectionId connectionId, FeedRuntimeId runtimeId, FeedPolicyAccessor policyAccessor,
-            FeedRuntimeInputHandler inputHandler) throws IOException {
-        this.connectionId = connectionId;
-        this.runtimeId = runtimeId;
-        this.policyAccessor = policyAccessor;
-        this.inputHandler = inputHandler;
-        this.maxFractionDiscard = policyAccessor.getMaxFractionDiscard();
-    }
-
-    public boolean processMessage(ByteBuffer message) {
-        if (policyAccessor.getMaxFractionDiscard() != 0) {
-            long nProcessed = inputHandler.getProcessed();
-            long discardLimit = (long) (nProcessed * maxFractionDiscard);
-            if (nDiscarded >= discardLimit) {
-                return false;
-            }
-            nDiscarded++;
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Discarded frame by " + connectionId + " (" + runtimeId + ")" + " count so far  ("
-                        + nDiscarded + ") Limit [" + discardLimit + "]");
-            }
-            return true;
-        }
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java
deleted file mode 100644
index c4a2ce0..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameHandlers.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedFrameHandler;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FeedFrameHandlers {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedFrameHandlers.class.getName());
-
-    public enum RoutingMode {
-        IN_MEMORY_ROUTE,
-        SPILL_TO_DISK,
-        DISCARD
-    }
-
-    public static IFeedFrameHandler getFeedFrameHandler(FrameDistributor distributor, FeedId feedId,
-            RoutingMode routingMode, FeedRuntimeType runtimeType, int partition, int frameSize) throws IOException {
-        IFeedFrameHandler handler = null;
-        switch (routingMode) {
-            case IN_MEMORY_ROUTE:
-                handler = new InMemoryRouter(distributor.getRegisteredReaders().values(), runtimeType, partition);
-                break;
-            case SPILL_TO_DISK:
-                handler = new DiskSpiller(distributor, feedId, runtimeType, partition, frameSize);
-                break;
-            case DISCARD:
-                handler = new DiscardRouter(distributor, feedId, runtimeType, partition);
-                break;
-            default:
-                throw new IllegalArgumentException("Invalid routing mode" + routingMode);
-        }
-        return handler;
-    }
-
-    public static class DiscardRouter implements IFeedFrameHandler {
-
-        private final FeedId feedId;
-        private int nDiscarded;
-        private final FeedRuntimeType runtimeType;
-        private final int partition;
-        private final FrameDistributor distributor;
-
-        public DiscardRouter(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition)
-                throws HyracksDataException {
-            this.distributor = distributor;
-            this.feedId = feedId;
-            this.nDiscarded = 0;
-            this.runtimeType = runtimeType;
-            this.partition = partition;
-        }
-
-        @Override
-        public void handleFrame(ByteBuffer frame) throws HyracksDataException {
-            FrameTupleAccessor fta = distributor.getFta();
-            fta.reset(frame);
-            int nTuples = fta.getTupleCount();
-            nDiscarded += nTuples;
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Discarded additional [" + runtimeType + "]" + "(" + partition + ")" + "  " + nTuples);
-            }
-        }
-
-        @Override
-        public void handleDataBucket(DataBucket bucket) {
-            nDiscarded++;
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Discard Count" + nDiscarded);
-            }
-        }
-
-        @Override
-        public void close() {
-            // do nothing, no resource to relinquish
-        }
-
-        @Override
-        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
-            throw new IllegalStateException("Invalid operation");
-        }
-
-        @Override
-        public String toString() {
-            return "DiscardRouter" + "[" + feedId + "]" + "(" + nDiscarded + ")";
-        }
-
-        @Override
-        public String getSummary() {
-            return new String("Number of discarded frames (since last reset)" + " feedId " + "[" + feedId + "]" + "("
-                    + nDiscarded + ")");
-        }
-
-    }
-
-    public static class InMemoryRouter implements IFeedFrameHandler {
-
-        private final Collection<FeedFrameCollector> frameCollectors;
-
-        public InMemoryRouter(Collection<FeedFrameCollector> frameCollectors, FeedRuntimeType runtimeType,
-                int partition) {
-            this.frameCollectors = frameCollectors;
-        }
-
-        @Override
-        public void handleFrame(ByteBuffer frame) throws HyracksDataException {
-            throw new IllegalStateException("Operation not supported");
-        }
-
-        @Override
-        public void handleDataBucket(DataBucket bucket) {
-            for (FeedFrameCollector collector : frameCollectors) {
-                collector.sendMessage(bucket); // asynchronous call
-            }
-        }
-
-        @Override
-        public void close() {
-            // do nothing
-        }
-
-        @Override
-        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
-            throw new IllegalStateException("Operation not supported");
-        }
-
-        @Override
-        public String getSummary() {
-            return "InMemoryRouter Summary";
-        }
-    }
-
-    public static class DiskSpiller implements IFeedFrameHandler {
-
-        private FrameSpiller<ByteBuffer> receiver;
-        private Iterator<ByteBuffer> iterator;
-
-        public DiskSpiller(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition,
-                int frameSize) throws IOException {
-            receiver = new FrameSpiller<ByteBuffer>(distributor, feedId, frameSize);
-        }
-
-        @Override
-        public void handleFrame(ByteBuffer frame) throws HyracksDataException {
-            receiver.sendMessage(frame);
-        }
-
-        private static class FrameSpiller<T> extends MessageReceiver<ByteBuffer> {
-
-            private final FeedId feedId;
-            private BufferedOutputStream bos;
-            private final ByteBuffer reusableLengthBuffer;
-            private final ByteBuffer reusableDataBuffer;
-            private long offset;
-            private File file;
-            private final FrameDistributor frameDistributor;
-            private boolean fileCreated = false;
-
-            public FrameSpiller(FrameDistributor distributor, FeedId feedId, int frameSize) throws IOException {
-                this.feedId = feedId;
-                this.frameDistributor = distributor;
-                reusableLengthBuffer = ByteBuffer.allocate(4);
-                reusableDataBuffer = ByteBuffer.allocate(frameSize);
-                this.offset = 0;
-            }
-
-            @Override
-            public void processMessage(ByteBuffer message) throws Exception {
-                if (!fileCreated) {
-                    createFile();
-                    fileCreated = true;
-                }
-                reusableLengthBuffer.flip();
-                reusableLengthBuffer.putInt(message.array().length);
-                bos.write(reusableLengthBuffer.array());
-                bos.write(message.array());
-            }
-
-            private void createFile() throws IOException {
-                Date date = new Date();
-                String dateSuffix = date.toString().replace(' ', '_');
-                String fileName = feedId.toString() + "_" + frameDistributor.getFeedRuntimeType() + "_"
-                        + frameDistributor.getPartition() + "_" + dateSuffix;
-
-                file = new File(fileName);
-                if (!file.exists()) {
-                    boolean success = file.createNewFile();
-                    if (!success) {
-                        throw new IOException("Unable to create spill file for feed " + feedId);
-                    }
-                }
-                bos = new BufferedOutputStream(new FileOutputStream(file));
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Created Spill File for feed " + feedId);
-                }
-            }
-
-            @SuppressWarnings("resource")
-            public Iterator<ByteBuffer> replayData() throws Exception {
-                final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
-                bis.skip(offset);
-                return new Iterator<ByteBuffer>() {
-
-                    @Override
-                    public boolean hasNext() {
-                        boolean more = false;
-                        try {
-                            more = bis.available() > 0;
-                            if (!more) {
-                                bis.close();
-                            }
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        }
-
-                        return more;
-                    }
-
-                    @Override
-                    public ByteBuffer next() {
-                        reusableLengthBuffer.flip();
-                        try {
-                            bis.read(reusableLengthBuffer.array());
-                            reusableLengthBuffer.flip();
-                            int frameSize = reusableLengthBuffer.getInt();
-                            reusableDataBuffer.flip();
-                            bis.read(reusableDataBuffer.array(), 0, frameSize);
-                            offset += 4 + frameSize;
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        }
-                        return reusableDataBuffer;
-                    }
-
-                    @Override
-                    public void remove() {
-                    }
-
-                };
-            }
-
-        }
-
-        @Override
-        public void handleDataBucket(DataBucket bucket) {
-            throw new IllegalStateException("Operation not supported");
-        }
-
-        @Override
-        public void close() {
-            receiver.close(true);
-        }
-
-        @Override
-        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
-            try {
-                iterator = receiver.replayData();
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
-            return iterator;
-        }
-
-        //TODO: Form a summary that includes stats related to what has been spilled to disk
-        @Override
-        public String getSummary() {
-            return "Disk Spiller Summary";
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameSpiller.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameSpiller.java
deleted file mode 100644
index 86187b8..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameSpiller.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class FeedFrameSpiller {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
-
-    private final IHyracksTaskContext ctx;
-    private final FeedConnectionId connectionId;
-    private final FeedRuntimeId runtimeId;
-    private final FeedPolicyAccessor policyAccessor;
-    private BufferedOutputStream bos;
-    private File file;
-    private boolean fileCreated = false;
-    private long bytesWritten = 0;
-    private int spilledFrameCount = 0;
-
-    public FeedFrameSpiller(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            FeedPolicyAccessor policyAccessor) throws IOException {
-        this.ctx = ctx;
-        this.connectionId = connectionId;
-        this.runtimeId = runtimeId;
-        this.policyAccessor = policyAccessor;
-    }
-
-    public boolean processMessage(ByteBuffer message) throws Exception {
-        if (!fileCreated) {
-            createFile();
-            fileCreated = true;
-        }
-        long maxAllowed = policyAccessor.getMaxSpillOnDisk();
-        if (maxAllowed != FeedPolicyAccessor.NO_LIMIT && bytesWritten + message.array().length > maxAllowed) {
-            return false;
-        } else {
-            bos.write(message.array());
-            bytesWritten += message.array().length;
-            spilledFrameCount++;
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Spilled frame by " + runtimeId + " spill count " + spilledFrameCount);
-            }
-            return true;
-        }
-    }
-
-    private void createFile() throws IOException {
-        Date date = new Date();
-        String dateSuffix = date.toString().replace(' ', '_');
-        String fileName = connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
-                + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition() + "_" + dateSuffix;
-
-        file = new File(fileName);
-        if (!file.exists()) {
-            boolean success = file.createNewFile();
-            if (!success) {
-                throw new IOException("Unable to create spill file " + fileName + " for feed " + runtimeId);
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Created spill file " + file.getAbsolutePath());
-                }
-            }
-        }
-        bos = new BufferedOutputStream(new FileOutputStream(file));
-
-    }
-
-    public Iterator<ByteBuffer> replayData() throws Exception {
-        bos.flush();
-        return new FrameIterator(ctx, file.getName());
-    }
-
-    private static class FrameIterator implements Iterator<ByteBuffer> {
-
-        private final BufferedInputStream bis;
-        private final IHyracksTaskContext ctx;
-        private int readFrameCount = 0;
-
-        public FrameIterator(IHyracksTaskContext ctx, String filename) throws FileNotFoundException {
-            bis = new BufferedInputStream(new FileInputStream(new File(filename)));
-            this.ctx = ctx;
-        }
-
-        @Override
-        public boolean hasNext() {
-            boolean more = false;
-            try {
-                more = bis.available() > 0;
-                if (!more) {
-                    bis.close();
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-
-            return more;
-        }
-
-        @Override
-        public ByteBuffer next() {
-            IFrame frame  = null;
-            try {
-                frame  = new VSizeFrame(ctx);
-                Arrays.fill(frame.getBuffer().array(), (byte) 0);
-                bis.read(frame.getBuffer().array(), 0, frame.getFrameSize());
-                readFrameCount++;
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Read spill frome " + readFrameCount);
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-            return frame.getBuffer();
-        }
-
-        @Override
-        public void remove() {
-        }
-
-    }
-
-    public void reset() {
-        bytesWritten = 0;
-        //  file.delete();
-        fileCreated = false;
-        bos = null;
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Resetted the FrameSpiller!");
-        }
-    }
-
-    public void close() {
-        if (bos != null) {
-            try {
-                bos.flush();
-                bos.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameTupleAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameTupleAccessor.java
deleted file mode 100644
index 9645bf9..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameTupleAccessor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.feeds.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FeedFrameTupleAccessor implements IFrameTupleAccessor {
-
-    private final FrameTupleAccessor frameAccessor;
-    private final int numOpenFields;
-
-    public FeedFrameTupleAccessor(FrameTupleAccessor frameAccessor) {
-        this.frameAccessor = frameAccessor;
-        int firstRecordStart = frameAccessor.getTupleStartOffset(0) + frameAccessor.getFieldSlotsLength();
-        int openPartOffsetOrig = frameAccessor.getBuffer().getInt(firstRecordStart + 6);
-        numOpenFields = frameAccessor.getBuffer().getInt(firstRecordStart + openPartOffsetOrig);
-    }
-
-    public int getFeedIntakePartition(int tupleIndex) {
-        ByteBuffer buffer = frameAccessor.getBuffer();
-        int recordStart = frameAccessor.getTupleStartOffset(tupleIndex) + frameAccessor.getFieldSlotsLength();
-        int openPartOffsetOrig = buffer.getInt(recordStart + 6);
-        int partitionOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
-                + StatisticsConstants.INTAKE_PARTITION.length() + 2 + 1;
-        return buffer.getInt(recordStart + partitionOffset);
-    }
-    
-    
-
-    @Override
-    public int getFieldCount() {
-        return frameAccessor.getFieldCount();
-    }
-
-    @Override
-    public int getFieldSlotsLength() {
-        return frameAccessor.getFieldSlotsLength();
-    }
-
-    @Override
-    public int getFieldEndOffset(int tupleIndex, int fIdx) {
-        return frameAccessor.getFieldEndOffset(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getFieldStartOffset(int tupleIndex, int fIdx) {
-        return frameAccessor.getFieldStartOffset(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getFieldLength(int tupleIndex, int fIdx) {
-        return frameAccessor.getFieldLength(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getTupleEndOffset(int tupleIndex) {
-        return frameAccessor.getTupleEndOffset(tupleIndex);
-    }
-
-    @Override
-    public int getTupleStartOffset(int tupleIndex) {
-        return frameAccessor.getTupleStartOffset(tupleIndex);
-    }
-
-    @Override
-    public int getTupleCount() {
-        return frameAccessor.getTupleCount();
-    }
-
-    @Override
-    public ByteBuffer getBuffer() {
-        return frameAccessor.getBuffer();
-    }
-
-    @Override
-    public void reset(ByteBuffer buffer) {
-        frameAccessor.reset(buffer);
-    }
-
-    @Override
-    public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
-        return frameAccessor.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
-    }
-
-    @Override
-    public int getTupleLength(int tupleIndex) {
-        return frameAccessor.getTupleLength(tupleIndex);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameUtil.java
deleted file mode 100644
index baf7a7c..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedFrameUtil.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.Random;
-
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-public class FeedFrameUtil {
-
-    public static ByteBuffer getSlicedFrame(IHyracksTaskContext ctx, int tupleIndex, FrameTupleAccessor fta) throws HyracksDataException {
-        FrameTupleAppender appender = new FrameTupleAppender();
-        IFrame slicedFrame = new VSizeFrame(ctx);
-        appender.reset(slicedFrame, true);
-        int startTupleIndex = tupleIndex + 1;
-        int totalTuples = fta.getTupleCount();
-        for (int ti = startTupleIndex; ti < totalTuples; ti++) {
-            appender.append(fta, ti);
-        }
-        return slicedFrame.getBuffer();
-    }
-
-    public static ByteBuffer getSampledFrame(IHyracksTaskContext ctx, FrameTupleAccessor fta, int sampleSize) throws HyracksDataException {
-        NChooseKIterator it = new NChooseKIterator(fta.getTupleCount(), sampleSize);
-        FrameTupleAppender appender = new FrameTupleAppender();
-        IFrame sampledFrame = new VSizeFrame(ctx);
-        appender.reset(sampledFrame, true);
-        int nextTupleIndex = 0;
-        while (it.hasNext()) {
-            nextTupleIndex = it.next();
-            appender.append(fta, nextTupleIndex);
-        }
-        return sampledFrame.getBuffer();
-    }
-    
-  
-
-    private static class NChooseKIterator {
-
-        private final int n;
-        private final int k;
-        private final BitSet bSet;
-        private final Random random;
-
-        private int traversed = 0;
-
-        public NChooseKIterator(int n, int k) {
-            this.n = n;
-            this.k = k;
-            this.bSet = new BitSet(n);
-            bSet.set(0, n - 1);
-            this.random = new Random();
-        }
-
-        public boolean hasNext() {
-            return traversed < k;
-        }
-
-        public int next() {
-            if (hasNext()) {
-                traversed++;
-                int startOffset = random.nextInt(n);
-                int pos = -1;
-                while (pos < 0) {
-                    pos = bSet.nextSetBit(startOffset);
-                    if (pos < 0) {
-                        startOffset = 0;
-                    }
-                }
-                bSet.clear(pos);
-                return pos;
-            } else {
-                return -1;
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedId.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedId.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedId.java
deleted file mode 100644
index 81b7c4e..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedId.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.Serializable;
-
-/**
- * A unique identifier for a data feed.
- */
-public class FeedId implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private final String dataverse;
-    private final String feedName;
-
-    public FeedId(String dataverse, String feedName) {
-        this.dataverse = dataverse;
-        this.feedName = feedName;
-    }
-
-    public String getDataverse() {
-        return dataverse;
-    }
-
-    public String getFeedName() {
-        return feedName;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || !(o instanceof FeedId)) {
-            return false;
-        }
-        if (this == o || ((FeedId) o).getFeedName().equals(feedName) && ((FeedId) o).getDataverse().equals(dataverse)) {
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return dataverse + "." + feedName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedIntakeInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedIntakeInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedIntakeInfo.java
deleted file mode 100644
index 0382b23..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedIntakeInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.List;
-
-import org.apache.asterix.common.feeds.api.IFeedJoint;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedIntakeInfo extends FeedJobInfo {
-
-    private final FeedId feedId;
-    private final IFeedJoint intakeFeedJoint;
-    private final JobSpecification spec;
-    private List<String> intakeLocation;
-
-    public FeedIntakeInfo(JobId jobId, FeedJobState state, JobType jobType, FeedId feedId, IFeedJoint intakeFeedJoint,
-            JobSpecification spec) {
-        super(jobId, state, FeedJobInfo.JobType.INTAKE, spec);
-        this.feedId = feedId;
-        this.intakeFeedJoint = intakeFeedJoint;
-        this.spec = spec;
-    }
-
-    public FeedId getFeedId() {
-        return feedId;
-    }
-
-    public IFeedJoint getIntakeFeedJoint() {
-        return intakeFeedJoint;
-    }
-
-    public JobSpecification getSpec() {
-        return spec;
-    }
-
-    public List<String> getIntakeLocation() {
-        return intakeLocation;
-    }
-
-    public void setIntakeLocation(List<String> intakeLocation) {
-        this.intakeLocation = intakeLocation;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJobInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJobInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJobInfo.java
deleted file mode 100644
index 2db9955..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJobInfo.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class FeedJobInfo {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedJobInfo.class.getName());
-
-    public enum JobType {
-        INTAKE,
-        FEED_CONNECT
-    }
-
-    public enum FeedJobState {
-        CREATED,
-        ACTIVE,
-        UNDER_RECOVERY,
-        ENDED
-    }
-
-    protected final JobId jobId;
-    protected final JobType jobType;
-    protected FeedJobState state;
-    protected JobSpecification spec;
-
-    public FeedJobInfo(JobId jobId, FeedJobState state, JobType jobType, JobSpecification spec) {
-        this.jobId = jobId;
-        this.state = state;
-        this.jobType = jobType;
-        this.spec = spec;
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-
-    public FeedJobState getState() {
-        return state;
-    }
-
-    public void setState(FeedJobState state) {
-        this.state = state;
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(this + " is in " + state + " state.");
-        }
-    }
-
-    public JobType getJobType() {
-        return jobType;
-    }
-
-    public JobSpecification getSpec() {
-        return spec;
-    }
-
-    public void setSpec(JobSpecification spec) {
-        this.spec = spec;
-    }
-
-    public String toString() {
-        return jobId + " [" + jobType + "]";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJointKey.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJointKey.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJointKey.java
deleted file mode 100644
index 8005967..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedJointKey.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * Represents a unique identifier for a Feed Joint. A Feed joint is a logical entity located
- * along a feed ingestion pipeline at a point where the tuples moving as part of data flow
- * constitute the feed. The feed joint acts as a network tap and allows the flowing data to be
- * routed to multiple paths.
- */
-public class FeedJointKey {
-
-    private final FeedId primaryFeedId;
-    private final List<String> appliedFunctions;
-    private final String stringRep;
-
-    public FeedJointKey(FeedId feedId, List<String> appliedFunctions) {
-        this.primaryFeedId = feedId;
-        this.appliedFunctions = appliedFunctions;
-        StringBuilder builder = new StringBuilder();
-        builder.append(feedId);
-        builder.append(":");
-        builder.append(StringUtils.join(appliedFunctions, ':'));
-        stringRep = builder.toString();
-    }
-
-    public FeedId getFeedId() {
-        return primaryFeedId;
-    }
-
-    public List<String> getAppliedFunctions() {
-        return appliedFunctions;
-    }
-
-    public String getStringRep() {
-        return stringRep;
-    }
-
-    @Override
-    public final String toString() {
-        return stringRep;
-    }
-
-    @Override
-    public int hashCode() {
-        return stringRep.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || !(o instanceof FeedJointKey)) {
-            return false;
-        }
-        return stringRep.equals(((FeedJointKey) o).stringRep);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMemoryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMemoryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMemoryManager.java
deleted file mode 100644
index c39d82a..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMemoryManager.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent;
-import org.apache.asterix.common.feeds.api.IFeedMemoryComponent.Type;
-import org.apache.asterix.common.feeds.api.IFeedMemoryManager;
-
-public class FeedMemoryManager implements IFeedMemoryManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMemoryManager.class.getName());
-    private static final int ALLOCATION_INCREMENT = 10;
-
-    private final AtomicInteger componentId = new AtomicInteger(0);
-    private final String nodeId;
-    private final int budget;
-    private final int frameSize;
-
-    private int committed;
-
-    public FeedMemoryManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize) {
-        this.nodeId = nodeId;
-        this.frameSize = frameSize;
-        budget = (int) feedProperties.getMemoryComponentGlobalBudget() / frameSize;
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Feed Memory budget " + budget + " frames (frame size=" + frameSize + ")");
-        }
-    }
-
-    @Override
-    public synchronized IFeedMemoryComponent getMemoryComponent(Type type) {
-        IFeedMemoryComponent memoryComponent = null;
-        boolean valid = false;
-        switch (type) {
-            case COLLECTION:
-                valid = committed + START_COLLECTION_SIZE <= budget;
-                if (valid) {
-                    memoryComponent = new FrameCollection(componentId.incrementAndGet(), this, START_COLLECTION_SIZE);
-                }
-                break;
-            case POOL:
-                valid = committed + START_POOL_SIZE <= budget;
-                if (valid) {
-                    memoryComponent = new DataBucketPool(componentId.incrementAndGet(), this, START_POOL_SIZE,
-                            frameSize);
-                }
-                committed += START_POOL_SIZE;
-                break;
-        }
-        if (!valid) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to allocate memory component of type" + type);
-            }
-        }
-        return valid ? memoryComponent : null;
-    }
-
-    @Override
-    public synchronized boolean expandMemoryComponent(IFeedMemoryComponent memoryComponent) {
-        if (committed + ALLOCATION_INCREMENT > budget) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Memory budget " + budget + " is exhausted. Space left: " + (budget - committed)
-                        + " frames.");
-            }
-            return false;
-        } else {
-            memoryComponent.expand(ALLOCATION_INCREMENT);
-            committed += ALLOCATION_INCREMENT;
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Expanded memory component " + memoryComponent + " by " + ALLOCATION_INCREMENT + " " + this);
-            }
-            return true;
-        }
-    }
-
-    @Override
-    public synchronized void releaseMemoryComponent(IFeedMemoryComponent memoryComponent) {
-        int delta = memoryComponent.getTotalAllocation();
-        committed -= delta;
-        memoryComponent.reset();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Reset " + memoryComponent + " and reclaimed " + delta + " frames " + this);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "FeedMemoryManager  [" + nodeId + "]" + "(" + committed + "/" + budget + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMessageService.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMessageService.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMessageService.java
deleted file mode 100644
index 9582602..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMessageService.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.net.Socket;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.feeds.api.IFeedMessage;
-import org.apache.asterix.common.feeds.api.IFeedMessageService;
-
-/**
- * Sends feed report messages on behalf of an operator instance
- * to the SuperFeedManager associated with the feed.
- */
-public class FeedMessageService implements IFeedMessageService {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
-
-    private final LinkedBlockingQueue<String> inbox;
-    private final FeedMessageHandler mesgHandler;
-    private final String nodeId;
-    private ExecutorService executor;
-
-    public FeedMessageService(AsterixFeedProperties feedProperties, String nodeId, String ccClusterIp) {
-        this.inbox = new LinkedBlockingQueue<String>();
-        this.mesgHandler = new FeedMessageHandler(inbox, ccClusterIp, feedProperties.getFeedCentralManagerPort());
-        this.nodeId = nodeId;
-        this.executor = Executors.newSingleThreadExecutor();
-    }
-
-    public void start() throws Exception {
-
-        executor.execute(mesgHandler);
-    }
-
-    public void stop() {
-        synchronized (mesgHandler.getLock()) {
-            executor.shutdownNow();
-        }
-        mesgHandler.stop();
-    }
-
-    @Override
-    public void sendMessage(IFeedMessage message) {
-        try {
-            JSONObject obj = message.toJSON();
-            obj.put(FeedConstants.MessageConstants.NODE_ID, nodeId);
-            obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, message.getMessageType().name());
-            inbox.add(obj.toString());
-        } catch (JSONException jse) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("JSON exception in parsing message " + message + " exception [" + jse.getMessage() + "]");
-            }
-        }
-    }
-
-    private static class FeedMessageHandler implements Runnable {
-
-        private final LinkedBlockingQueue<String> inbox;
-        private final String host;
-        private final int port;
-        private final Object lock;
-
-        private Socket cfmSocket;
-
-        private static final byte[] EOL = "\n".getBytes();
-
-        public FeedMessageHandler(LinkedBlockingQueue<String> inbox, String host, int port) {
-            this.inbox = inbox;
-            this.host = host;
-            this.port = port;
-            this.lock = new Object();
-        }
-
-        public void run() {
-            try {
-                cfmSocket = new Socket(host, port);
-                if (cfmSocket != null) {
-                    while (true) {
-                        String message = inbox.take();
-                        synchronized (lock) { // lock prevents message handler from sending incomplete message midst shutdown attempt
-                            cfmSocket.getOutputStream().write(message.getBytes());
-                            cfmSocket.getOutputStream().write(EOL);
-                        }
-                    }
-                } else {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Unable to start feed message service");
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
-                }
-            } finally {
-                stop();
-            }
-
-        }
-
-        public void stop() {
-            if (cfmSocket != null) {
-                try {
-                    cfmSocket.close();
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Exception in closing socket " + e.getMessage());
-                    }
-                }
-            }
-        }
-
-        public Object getLock() {
-            return lock;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java
deleted file mode 100644
index 7b76692..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedMetricCollector.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.api.IFeedMetricCollector;
-
-public class FeedMetricCollector implements IFeedMetricCollector {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMetricCollector.class.getName());
-
-    private static final int UNKNOWN = -1;
-
-    private final AtomicInteger globalSenderId = new AtomicInteger(1);
-    private final Map<Integer, Sender> senders = new HashMap<Integer, Sender>();
-    private final Map<Integer, Series> statHistory = new HashMap<Integer, Series>();
-    private final Map<String, Sender> sendersByName = new HashMap<String, Sender>();
-
-    public FeedMetricCollector(String nodeId) {
-    }
-
-    @Override
-    public synchronized int createReportSender(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            ValueType valueType, MetricType metricType) {
-        Sender sender = new Sender(globalSenderId.getAndIncrement(), connectionId, runtimeId, valueType, metricType);
-        senders.put(sender.senderId, sender);
-        sendersByName.put(sender.getDisplayName(), sender);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Sender id " + sender.getSenderId() + " created for " + sender);
-        }
-        return sender.senderId;
-    }
-
-    @Override
-    public void removeReportSender(int senderId) {
-        Sender sender = senders.get(senderId);
-        if (sender != null) {
-            statHistory.remove(senderId);
-            senders.remove(senderId);
-        } else {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to remove sender Id");
-            }
-            throw new IllegalStateException("Unable to remove sender Id " + senderId + " senders " + senders);
-        }
-    }
-
-    @Override
-    public boolean sendReport(int senderId, int value) {
-        Sender sender = senders.get(senderId);
-        if (sender != null) {
-            Series series = statHistory.get(sender.senderId);
-            if (series == null) {
-                switch (sender.mType) {
-                    case AVG:
-                        series = new SeriesAvg();
-                        break;
-                    case RATE:
-                        series = new SeriesRate();
-                        break;
-                }
-                statHistory.put(sender.senderId, series);
-            }
-            series.addValue(value);
-            return true;
-        }
-        throw new IllegalStateException("Unable to send report sender Id " + senderId + " senders " + senders);
-    }
-
-    @Override
-    public void resetReportSender(int senderId) {
-        Sender sender = senders.get(senderId);
-        if (sender != null) {
-            Series series = statHistory.get(sender.senderId);
-            if (series != null) {
-                series.reset();
-            }
-        } else {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Sender with id " + senderId + " not found. Unable to reset!");
-            }
-            throw new IllegalStateException("Unable to reset sender Id " + senderId + " senders " + senders);
-        }
-    }
-
-    private static class Sender {
-
-        private final int senderId;
-        private final MetricType mType;
-        private final String displayName;
-
-        public Sender(int senderId, FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType,
-                MetricType mType) {
-            this.senderId = senderId;
-            this.mType = mType;
-            this.displayName = createDisplayName(connectionId, runtimeId, valueType);
-        }
-
-        @Override
-        public String toString() {
-            return displayName + "[" + senderId + "]" + "(" + mType + ")";
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (!(o instanceof Sender)) {
-                return false;
-            }
-            return ((Sender) o).senderId == senderId;
-        }
-
-        @Override
-        public int hashCode() {
-            return senderId;
-        }
-
-        public static String createDisplayName(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-                ValueType valueType) {
-            return connectionId + " (" + runtimeId.getFeedRuntimeType() + " )" + "[" + runtimeId.getPartition() + "]"
-                    + "{" + valueType + "}";
-        }
-
-        public String getDisplayName() {
-            return displayName;
-        }
-
-        public int getSenderId() {
-            return senderId;
-        }
-    }
-
-    @Override
-    public int getMetric(int senderId) {
-        Sender sender = senders.get(senderId);
-        return getMetric(sender);
-    }
-
-    @Override
-    public int getMetric(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType) {
-        String displayName = Sender.createDisplayName(connectionId, runtimeId, valueType);
-        Sender sender = sendersByName.get(displayName);
-        return getMetric(sender);
-    }
-
-    private int getMetric(Sender sender) {
-        if (sender == null || statHistory.get(sender.getSenderId()) == null) {
-            return UNKNOWN;
-        }
-
-        float result = -1;
-        Series series = statHistory.get(sender.getSenderId());
-        switch (sender.mType) {
-            case AVG:
-                result = ((SeriesAvg) series).getAvg();
-                break;
-            case RATE:
-                result = ((SeriesRate) series).getRate();
-                break;
-        }
-        return (int) result;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java
deleted file mode 100644
index cd7d598..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedPolicyAccessor.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * A utility class to access the configuration parameters of a feed ingestion policy.
- */
-public class FeedPolicyAccessor implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    /** failure configuration **/
-    /** continue feed ingestion after a soft (runtime) failure **/
-    public static final String SOFT_FAILURE_CONTINUE = "soft.failure.continue";
-
-    /** log failed tuple to an asterixdb dataset for future reference **/
-    public static final String SOFT_FAILURE_LOG_DATA = "soft.failure.log.data";
-
-    /** continue feed ingestion after loss of one or more machines (hardware failure) **/
-    public static final String HARDWARE_FAILURE_CONTINUE = "hardware.failure.continue";
-
-    /** auto-start a loser feed when the asterixdb instance is restarted **/
-    public static final String CLUSTER_REBOOT_AUTO_RESTART = "cluster.reboot.auto.restart";
-
-    /** framework provides guarantee that each received feed record will be processed through the ingestion pipeline at least once **/
-    public static final String AT_LEAST_ONE_SEMANTICS = "atleast.once.semantics";
-
-    /** flow control configuration **/
-    /** spill excess tuples to disk if an operator cannot process incoming data at its arrival rate **/
-    public static final String SPILL_TO_DISK_ON_CONGESTION = "spill.to.disk.on.congestion";
-
-    /** the maximum size of data (tuples) that can be spilled to disk **/
-    public static final String MAX_SPILL_SIZE_ON_DISK = "max.spill.size.on.disk";
-
-    /** discard tuples altogether if an operator cannot process incoming data at its arrival rate **/
-    public static final String DISCARD_ON_CONGESTION = "discard.on.congestion";
-
-    /** maximum fraction of ingested data that can be discarded **/
-    public static final String MAX_FRACTION_DISCARD = "max.fraction.discard";
-
-    /** maximum end-to-end delay/latency in persisting a tuple through the feed ingestion pipeline **/
-    public static final String MAX_DELAY_RECORD_PERSISTENCE = "max.delay.record.persistence";
-
-    /** rate limit the inflow of tuples in accordance with the maximum capacity of the pipeline **/
-    public static final String THROTTLING_ENABLED = "throttling.enabled";
-
-    /** elasticity **/
-    public static final String ELASTIC = "elastic";
-
-    /** statistics **/
-    public static final String TIME_TRACKING = "time.tracking";
-
-    /** logging of statistics **/
-    public static final String LOGGING_STATISTICS = "logging.statistics";
-
-    public static final long NO_LIMIT = -1;
-
-    private Map<String, String> feedPolicy;
-
-    public Map<String, String> getFeedPolicy() {
-        return feedPolicy;
-    }
-
-    public FeedPolicyAccessor(Map<String, String> feedPolicy) {
-        this.feedPolicy = feedPolicy;
-    }
-
-    public void reset(Map<String, String> feedPolicy) {
-        this.feedPolicy = feedPolicy;
-    }
-
-    /** Failure recover/reporting **/
-
-    public boolean logDataOnSoftFailure() {
-        return getBooleanPropertyValue(SOFT_FAILURE_LOG_DATA, false);
-    }
-
-    public boolean continueOnSoftFailure() {
-        return getBooleanPropertyValue(SOFT_FAILURE_CONTINUE, false);
-    }
-
-    public boolean continueOnHardwareFailure() {
-        return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE, false);
-    }
-
-    public boolean autoRestartOnClusterReboot() {
-        return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART, false);
-    }
-
-    public boolean atleastOnceSemantics() {
-        return getBooleanPropertyValue(AT_LEAST_ONE_SEMANTICS, false);
-    }
-
-    /** flow control **/
-    public boolean spillToDiskOnCongestion() {
-        return getBooleanPropertyValue(SPILL_TO_DISK_ON_CONGESTION, false);
-    }
-
-    public boolean discardOnCongestion() {
-        return getMaxFractionDiscard() > 0;
-    }
-
-    public boolean throttlingEnabled() {
-        return getBooleanPropertyValue(THROTTLING_ENABLED, false);
-    }
-
-    public long getMaxSpillOnDisk() {
-        return getLongPropertyValue(MAX_SPILL_SIZE_ON_DISK, NO_LIMIT);
-    }
-
-    public float getMaxFractionDiscard() {
-        return getFloatPropertyValue(MAX_FRACTION_DISCARD, 0);
-    }
-
-    public long getMaxDelayRecordPersistence() {
-        return getLongPropertyValue(MAX_DELAY_RECORD_PERSISTENCE, Long.MAX_VALUE);
-    }
-
-    /** Elasticity **/
-    public boolean isElastic() {
-        return getBooleanPropertyValue(ELASTIC, false);
-    }
-
-    /** Statistics **/
-    public boolean isTimeTrackingEnabled() {
-        return getBooleanPropertyValue(TIME_TRACKING, false);
-    }
-
-    /** Logging of statistics **/
-    public boolean isLoggingStatisticsEnabled() {
-        return getBooleanPropertyValue(LOGGING_STATISTICS, false);
-    }
-
-    private boolean getBooleanPropertyValue(String key, boolean defValue) {
-        String v = feedPolicy.get(key);
-        return v == null ? false : Boolean.valueOf(v);
-    }
-
-    private long getLongPropertyValue(String key, long defValue) {
-        String v = feedPolicy.get(key);
-        return v != null ? Long.parseLong(v) : defValue;
-    }
-
-    private float getFloatPropertyValue(String key, float defValue) {
-        String v = feedPolicy.get(key);
-        return v != null ? Float.parseFloat(v) : defValue;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntime.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntime.java
deleted file mode 100644
index 87276ec..0000000
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntime.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.feeds;
-
-import org.apache.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.common.feeds.api.IFeedRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
-
-public class FeedRuntime implements IFeedRuntime {
-
-    /** A unique identifier for the runtime **/
-    protected final FeedRuntimeId runtimeId;
-
-    /** The output frame writer associated with the runtime **/
-    protected IFrameWriter frameWriter;
-
-    /** The pre-processor associated with the runtime **/
-    protected FeedRuntimeInputHandler inputHandler;
-
-    public FeedRuntime(FeedRuntimeId runtimeId, FeedRuntimeInputHandler inputHandler, IFrameWriter frameWriter) {
-        this.runtimeId = runtimeId;
-        this.frameWriter = frameWriter;
-        this.inputHandler = inputHandler;
-    }
-
-    public void setFrameWriter(IFeedOperatorOutputSideHandler frameWriter) {
-        this.frameWriter = frameWriter;
-    }
-
-    @Override
-    public FeedRuntimeId getRuntimeId() {
-        return runtimeId;
-    }
-
-    @Override
-    public IFrameWriter getFeedFrameWriter() {
-        return frameWriter;
-    }
-
-    @Override
-    public String toString() {
-        return runtimeId.toString();
-    }
-
-    @Override
-    public FeedRuntimeInputHandler getInputHandler() {
-        return inputHandler;
-    }
-
-    public Mode getMode() {
-        return inputHandler != null ? inputHandler.getMode() : Mode.PROCESS;
-    }
-
-    public void setMode(Mode mode) {
-        this.inputHandler.setMode(mode);
-    }
-
-}