You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/05/15 19:03:58 UTC

[5/9] incubator-asterixdb git commit: Cleanup Feed CodeBase

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
deleted file mode 100644
index 22dcfac..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/StorageFrameHandler.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.asterix.external.feed.watch.IntakePartitionStatistics;
-import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class StorageFrameHandler {
-
-    private final Map<Integer, Map<Integer, IntakePartitionStatistics>> intakeStatistics;
-    private long avgDelayPersistence;
-
-    public StorageFrameHandler() {
-        intakeStatistics = new HashMap<Integer, Map<Integer, IntakePartitionStatistics>>();
-        avgDelayPersistence = 0L;
-    }
-
-    public synchronized void updateTrackingInformation(ByteBuffer frame, FrameTupleAccessor frameAccessor) {
-        int nTuples = frameAccessor.getTupleCount();
-        long delay = 0;
-        long intakeTimestamp;
-        long currentTime = System.currentTimeMillis();
-        int partition = 0;
-        int recordId = 0;
-        for (int i = 0; i < nTuples; i++) {
-            int recordStart = frameAccessor.getTupleStartOffset(i) + frameAccessor.getFieldSlotsLength();
-            int openPartOffsetOrig = frame.getInt(recordStart + 6);
-            int numOpenFields = frame.getInt(recordStart + openPartOffsetOrig);
-
-            int recordIdOffset = openPartOffsetOrig + 4 + 8 * numOpenFields
-                    + (StatisticsConstants.INTAKE_TUPLEID.length() + 2) + 1;
-            recordId = frame.getInt(recordStart + recordIdOffset);
-
-            int partitionOffset = recordIdOffset + 4 + (StatisticsConstants.INTAKE_PARTITION.length() + 2) + 1;
-            partition = frame.getInt(recordStart + partitionOffset);
-
-            ackRecordId(partition, recordId);
-            int intakeTimestampValueOffset = partitionOffset + 4 + (StatisticsConstants.INTAKE_TIMESTAMP.length() + 2)
-                    + 1;
-            intakeTimestamp = frame.getLong(recordStart + intakeTimestampValueOffset);
-
-            int storeTimestampValueOffset = intakeTimestampValueOffset + 8
-                    + (StatisticsConstants.STORE_TIMESTAMP.length() + 2) + 1;
-            frame.putLong(recordStart + storeTimestampValueOffset, System.currentTimeMillis());
-            delay += currentTime - intakeTimestamp;
-        }
-        avgDelayPersistence = delay / nTuples;
-    }
-
-    private void ackRecordId(int partition, int recordId) {
-        Map<Integer, IntakePartitionStatistics> map = intakeStatistics.get(partition);
-        if (map == null) {
-            map = new HashMap<Integer, IntakePartitionStatistics>();
-            intakeStatistics.put(partition, map);
-        }
-        int base = (int) Math.ceil(recordId * 1.0 / IntakePartitionStatistics.ACK_WINDOW_SIZE);
-        IntakePartitionStatistics intakeStatsForBaseOfPartition = map.get(base);
-        if (intakeStatsForBaseOfPartition == null) {
-            intakeStatsForBaseOfPartition = new IntakePartitionStatistics(partition, base);
-            map.put(base, intakeStatsForBaseOfPartition);
-        }
-        intakeStatsForBaseOfPartition.ackRecordId(recordId);
-    }
-
-    public byte[] getAckData(int partition, int base) {
-        Map<Integer, IntakePartitionStatistics> intakeStats = intakeStatistics.get(partition);
-        if (intakeStats != null) {
-            IntakePartitionStatistics intakePartitionStats = intakeStats.get(base);
-            if (intakePartitionStats != null) {
-                return intakePartitionStats.getAckInfo();
-            }
-        }
-        return null;
-    }
-
-    public synchronized Map<Integer, IntakePartitionStatistics> getBaseAcksForPartition(int partition) {
-        Map<Integer, IntakePartitionStatistics> intakeStatsForPartition = intakeStatistics.get(partition);
-        Map<Integer, IntakePartitionStatistics> clone = new HashMap<Integer, IntakePartitionStatistics>();
-        for (Entry<Integer, IntakePartitionStatistics> entry : intakeStatsForPartition.entrySet()) {
-            clone.put(entry.getKey(), entry.getValue());
-        }
-        return intakeStatsForPartition;
-    }
-
-    public long getAvgDelayPersistence() {
-        return avgDelayPersistence;
-    }
-
-    public void setAvgDelayPersistence(long avgDelayPersistence) {
-        this.avgDelayPersistence = avgDelayPersistence;
-    }
-
-    public Set<Integer> getPartitionsWithStats() {
-        return intakeStatistics.keySet();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
new file mode 100644
index 0000000..1bdc7e1
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class SyncFeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private final FeedExceptionHandler exceptionHandler;
+
+    public SyncFeedRuntimeInputHandler(IHyracksTaskContext ctx, IFrameWriter writer, FrameTupleAccessor fta) {
+        this.writer = writer;
+        this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+        while (frame != null) {
+            try {
+                writer.nextFrame(frame);
+                return;
+            } catch (HyracksDataException e) {
+                frame = exceptionHandler.handle(e, frame);
+                if (frame == null) {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        writer.flush();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
new file mode 100644
index 0000000..25aa86a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.external.feed.dataflow.FrameAction;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ConcurrentFramePool {
+    private static final String ERROR_INVALID_FRAME_SIZE =
+            "The size should be an integral multiple of the default frame size";
+    private final String nodeId;
+    private final int budget;
+    private final int defaultFrameSize;
+    private final ArrayDeque<ByteBuffer> pool;
+    private final ArrayDeque<FrameAction> subscribers = new ArrayDeque<>();
+    private final Map<Integer, ArrayDeque<ByteBuffer>> largeFramesPools;
+    private int handedOut;
+    private int created;
+
+    public ConcurrentFramePool(String nodeId, long budgetInBytes, int frameSize) {
+        this.nodeId = nodeId;
+        this.defaultFrameSize = frameSize;
+        this.budget = (int) (budgetInBytes / frameSize);
+        this.pool = new ArrayDeque<>(budget);
+        this.largeFramesPools = new HashMap<>();
+    }
+
+    public synchronized ByteBuffer get() {
+        if (handedOut < budget) {
+            handedOut++;
+            return allocate();
+        }
+        return null;
+    }
+
+    public int remaining() {
+        return budget - handedOut;
+    }
+
+    public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+        if (bufferSize % defaultFrameSize != 0) {
+            throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE);
+        }
+        int multiplier = bufferSize / defaultFrameSize;
+        if (handedOut + multiplier <= budget) {
+            handedOut += multiplier;
+            ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiplier);
+            if (largeFramesPool == null || largeFramesPool.isEmpty()) {
+                if (created + multiplier > budget) {
+                    freeup(multiplier);
+                }
+                created += multiplier;
+                return ByteBuffer.allocate(bufferSize);
+            }
+            return largeFramesPool.poll();
+        }
+        // Not enough budget
+        return null;
+    }
+
+    private int freeup(int desiredNumberOfFreePages) {
+        int needToFree = desiredNumberOfFreePages - (budget - created);
+        int freed = 0;
+        // start by large frames
+        for (Iterator<Entry<Integer, ArrayDeque<ByteBuffer>>> it = largeFramesPools.entrySet().iterator(); it
+                .hasNext();) {
+            Entry<Integer, ArrayDeque<ByteBuffer>> entry = it.next();
+            if (entry.getKey() != desiredNumberOfFreePages) {
+                while (!entry.getValue().isEmpty()) {
+                    entry.getValue().pop();
+                    freed += entry.getKey();
+                    if (freed >= needToFree) {
+                        // created is handled here
+                        created -= freed;
+                        return freed;
+                    }
+                }
+                it.remove();
+            }
+        }
+        // freed all large pages. need to free small pages as well
+        needToFree -= freed;
+        while (needToFree > 0) {
+            pool.pop();
+            needToFree--;
+            freed++;
+        }
+        created -= freed;
+        return freed;
+    }
+
+    private ByteBuffer allocate() {
+        if (pool.isEmpty()) {
+            if (created == budget) {
+                freeup(1);
+            }
+            created++;
+            return ByteBuffer.allocate(defaultFrameSize);
+        } else {
+            return pool.pop();
+        }
+    }
+
+    public synchronized boolean get(Collection<ByteBuffer> buffers, int count) {
+        if (handedOut + count <= budget) {
+            handedOut += count;
+            for (int i = 0; i < count; i++) {
+                buffers.add(allocate());
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "ConcurrentFramePool  [" + nodeId + "]" + "(consumed:" + handedOut + "/" + budget + ")";
+    }
+
+    public synchronized void release(Collection<ByteBuffer> buffers) throws HyracksDataException {
+        for (ByteBuffer buffer : buffers) {
+            release(buffer);
+        }
+    }
+
+    public synchronized void release(ByteBuffer buffer) throws HyracksDataException {
+        int multiples = buffer.capacity() / defaultFrameSize;
+        handedOut -= multiples;
+        if (multiples == 1) {
+            pool.add(buffer);
+        } else {
+            ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiples);
+            if (largeFramesPool == null) {
+                largeFramesPool = new ArrayDeque<>();
+                largeFramesPools.put(multiples, largeFramesPool);
+            }
+            largeFramesPool.push(buffer);
+        }
+        // check subscribers
+        while (!subscribers.isEmpty()) {
+            FrameAction frameAction = subscribers.peek();
+            // check if we have enough and answer immediately.
+            if (frameAction.getSize() == defaultFrameSize) {
+                buffer = get();
+            } else {
+                buffer = get(frameAction.getSize());
+            }
+            if (buffer != null) {
+                try {
+                    frameAction.call(buffer);
+                } finally {
+                    subscribers.remove();
+                }
+            } else {
+                break;
+            }
+        }
+    }
+
+    public synchronized boolean subscribe(FrameAction frameAction) throws HyracksDataException {
+        // check if subscribers are empty?
+        if (subscribers.isEmpty()) {
+            ByteBuffer buffer;
+            // check if we have enough and answer immediately.
+            if (frameAction.getSize() == defaultFrameSize) {
+                buffer = get();
+            } else {
+                buffer = get(frameAction.getSize());
+            }
+            if (buffer != null) {
+                frameAction.call(buffer);
+                // There is no need to subscribe. perform action and return false
+                return false;
+            }
+        }
+        // none of the above, add to subscribers and return true
+        subscribers.add(frameAction);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
index 1af7153..13f19b8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionId.java
@@ -29,16 +29,17 @@ public class FeedConnectionId implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final FeedId feedId;            // Dataverse - Feed
-    private final String datasetName;       // Dataset
+    private final String datasetName;       // Dataset <Dataset is empty in case of no target dataset>
+    private final int hash;
 
     public FeedConnectionId(FeedId feedId, String datasetName) {
         this.feedId = feedId;
         this.datasetName = datasetName;
+        this.hash = toString().hashCode();
     }
 
     public FeedConnectionId(String dataverse, String feedName, String datasetName) {
-        this.feedId = new FeedId(dataverse, feedName);
-        this.datasetName = datasetName;
+        this(new FeedId(dataverse, feedName), datasetName);
     }
 
     public FeedId getFeedId() {
@@ -64,7 +65,7 @@ public class FeedConnectionId implements Serializable {
 
     @Override
     public int hashCode() {
-        return toString().hashCode();
+        return hash;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
index dd2fc60..a746ef4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedConnectionManager.java
@@ -46,6 +46,7 @@ public class FeedConnectionManager implements IFeedConnectionManager {
         this.nodeId = nodeId;
     }
 
+    @Override
     public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
         return feedRuntimeManagers.get(feedId);
     }
@@ -67,8 +68,7 @@ public class FeedConnectionManager implements IFeedConnectionManager {
     }
 
     @Override
-    public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime)
-            throws Exception {
+    public synchronized void registerFeedRuntime(FeedConnectionId connectionId, FeedRuntime feedRuntime) {
         FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(connectionId);
         if (runtimeMgr == null) {
             runtimeMgr = new FeedRuntimeManager(connectionId, this);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
index 5095e7d..62f4c6c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
@@ -18,22 +18,14 @@
  */
 package org.apache.asterix.external.feed.management;
 
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.common.config.AsterixFeedProperties;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.feed.api.IFeedConnectionManager;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedMemoryManager;
-import org.apache.asterix.external.feed.api.IFeedMessageService;
-import org.apache.asterix.external.feed.api.IFeedMetadataManager;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector;
-import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
-import org.apache.asterix.external.feed.message.FeedMessageService;
-import org.apache.asterix.external.feed.watch.FeedMetricCollector;
-import org.apache.asterix.external.feed.watch.NodeLoadReportService;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
@@ -41,23 +33,13 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
  * Provider necessary central repository for registering/retrieving
  * artifacts/services associated with a feed.
  */
-public class FeedManager implements IFeedManager {
+public class FeedManager {
 
-    private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
-
-    private final IFeedSubscriptionManager feedSubscriptionManager;
+    private final Map<FeedRuntimeId, ISubscribableRuntime> subscribableRuntimes;
 
     private final IFeedConnectionManager feedConnectionManager;
 
-    private final IFeedMemoryManager feedMemoryManager;
-
-    private final IFeedMetricCollector feedMetricCollector;
-
-    private final IFeedMetadataManager feedMetadataManager;
-
-    private final IFeedMessageService feedMessageService;
-
-    private final NodeLoadReportService nodeLoadReportService;
+    private final ConcurrentFramePool feedMemoryManager;
 
     private final AsterixFeedProperties asterixFeedProperties;
 
@@ -68,60 +50,39 @@ public class FeedManager implements IFeedManager {
     public FeedManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize)
             throws AsterixException, HyracksDataException {
         this.nodeId = nodeId;
-        this.feedSubscriptionManager = new FeedSubscriptionManager(nodeId);
         this.feedConnectionManager = new FeedConnectionManager(nodeId);
-        this.feedMetadataManager = new FeedMetadataManager(nodeId);
-        this.feedMemoryManager = new FeedMemoryManager(nodeId, feedProperties, frameSize);
-        String ccClusterIp = AsterixClusterProperties.INSTANCE.getCluster() != null
-                ? AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp() : "localhost";
-        this.feedMessageService = new FeedMessageService(feedProperties, nodeId, ccClusterIp);
-        this.nodeLoadReportService = new NodeLoadReportService(nodeId, this);
-        try {
-            this.feedMessageService.start();
-            this.nodeLoadReportService.start();
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to start feed services " + e.getMessage());
-            }
-            e.printStackTrace();
-        }
-        this.feedMetricCollector = new FeedMetricCollector(nodeId);
+        this.feedMemoryManager =
+                new ConcurrentFramePool(nodeId, feedProperties.getMemoryComponentGlobalBudget(), frameSize);
         this.frameSize = frameSize;
         this.asterixFeedProperties = feedProperties;
+        this.subscribableRuntimes = new ConcurrentHashMap<FeedRuntimeId, ISubscribableRuntime>();
     }
 
-    @Override
-    public IFeedSubscriptionManager getFeedSubscriptionManager() {
-        return feedSubscriptionManager;
-    }
-
-    @Override
     public IFeedConnectionManager getFeedConnectionManager() {
         return feedConnectionManager;
     }
 
-    @Override
-    public IFeedMemoryManager getFeedMemoryManager() {
+    public ConcurrentFramePool getFeedMemoryManager() {
         return feedMemoryManager;
     }
 
-    @Override
-    public IFeedMetricCollector getFeedMetricCollector() {
-        return feedMetricCollector;
-    }
-
     public int getFrameSize() {
         return frameSize;
     }
 
-    @Override
-    public IFeedMetadataManager getFeedMetadataManager() {
-        return feedMetadataManager;
+    public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime) {
+        FeedRuntimeId sid = subscribableRuntime.getRuntimeId();
+        if (!subscribableRuntimes.containsKey(sid)) {
+            subscribableRuntimes.put(sid, subscribableRuntime);
+        }
     }
 
-    @Override
-    public IFeedMessageService getFeedMessageService() {
-        return feedMessageService;
+    public void deregisterFeedSubscribableRuntime(FeedRuntimeId subscribableRuntimeId) {
+        subscribableRuntimes.remove(subscribableRuntimeId);
+    }
+
+    public ISubscribableRuntime getSubscribableRuntime(FeedRuntimeId subscribableRuntimeId) {
+        return subscribableRuntimes.get(subscribableRuntimeId);
     }
 
     @Override
@@ -129,7 +90,6 @@ public class FeedManager implements IFeedManager {
         return "FeedManager " + "[" + nodeId + "]";
     }
 
-    @Override
     public AsterixFeedProperties getAsterixFeedProperties() {
         return asterixFeedProperties;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMemoryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMemoryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMemoryManager.java
deleted file mode 100644
index de9d22c..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMemoryManager.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
-import org.apache.asterix.external.feed.api.IFeedMemoryManager;
-import org.apache.asterix.external.feed.api.IFeedMemoryComponent.Type;
-import org.apache.asterix.external.feed.dataflow.DataBucketPool;
-import org.apache.asterix.external.feed.dataflow.FrameCollection;
-
-public class FeedMemoryManager implements IFeedMemoryManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMemoryManager.class.getName());
-    private static final int ALLOCATION_INCREMENT = 10;
-
-    private final AtomicInteger componentId = new AtomicInteger(0);
-    private final String nodeId;
-    private final int budget;
-    private final int frameSize;
-
-    private int committed;
-
-    public FeedMemoryManager(String nodeId, AsterixFeedProperties feedProperties, int frameSize) {
-        this.nodeId = nodeId;
-        this.frameSize = frameSize;
-        budget = (int) feedProperties.getMemoryComponentGlobalBudget() / frameSize;
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Feed Memory budget " + budget + " frames (frame size=" + frameSize + ")");
-        }
-    }
-
-    @Override
-    public synchronized IFeedMemoryComponent getMemoryComponent(Type type) {
-        IFeedMemoryComponent memoryComponent = null;
-        boolean valid = false;
-        switch (type) {
-            case COLLECTION:
-                valid = committed + START_COLLECTION_SIZE <= budget;
-                if (valid) {
-                    memoryComponent = new FrameCollection(componentId.incrementAndGet(), this, START_COLLECTION_SIZE);
-                }
-                break;
-            case POOL:
-                valid = committed + START_POOL_SIZE <= budget;
-                if (valid) {
-                    memoryComponent = new DataBucketPool(componentId.incrementAndGet(), this, START_POOL_SIZE,
-                            frameSize);
-                }
-                committed += START_POOL_SIZE;
-                break;
-        }
-        if (!valid) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to allocate memory component of type" + type);
-            }
-        }
-        return valid ? memoryComponent : null;
-    }
-
-    @Override
-    public synchronized boolean expandMemoryComponent(IFeedMemoryComponent memoryComponent) {
-        if (committed + ALLOCATION_INCREMENT > budget) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Memory budget " + budget + " is exhausted. Space left: " + (budget - committed)
-                        + " frames.");
-            }
-            return false;
-        } else {
-            memoryComponent.expand(ALLOCATION_INCREMENT);
-            committed += ALLOCATION_INCREMENT;
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Expanded memory component " + memoryComponent + " by " + ALLOCATION_INCREMENT + " " + this);
-            }
-            return true;
-        }
-    }
-
-    @Override
-    public synchronized void releaseMemoryComponent(IFeedMemoryComponent memoryComponent) {
-        int delta = memoryComponent.getTotalAllocation();
-        committed -= delta;
-        memoryComponent.reset();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Reset " + memoryComponent + " and reclaimed " + delta + " frames " + this);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "FeedMemoryManager  [" + nodeId + "]" + "(" + committed + "/" + budget + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMetadataManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMetadataManager.java
deleted file mode 100644
index 34ae461..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedMetadataManager.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.Date;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedMetadataManager;
-import org.apache.asterix.external.feed.message.XAQLFeedMessage;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.om.base.ARecord;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FeedMetadataManager implements IFeedMetadataManager {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMetadataManager.class.getName());
-
-    private final String nodeId;
-    private ARecordType recordType;
-
-    public FeedMetadataManager(String nodeId) throws AsterixException, HyracksDataException {
-        this.nodeId = nodeId;
-        String[] fieldNames = new String[] { "id", "dataverseName", "feedName", "targetDataset", "tuple", "message",
-                "timestamp" };
-        IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-                BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
-
-        recordType = new ARecordType(FeedConstants.FAILED_TUPLE_DATASET_TYPE, fieldNames, fieldTypes, true);
-    }
-
-    @Override
-    public void logTuple(FeedConnectionId connectionId, String tuple, String message, IFeedManager feedManager)
-            throws AsterixException {
-        try {
-            AString id = new AString("1");
-            AString dataverseValue = new AString(connectionId.getFeedId().getDataverse());
-            AString feedValue = new AString(connectionId.getFeedId().getFeedName());
-            AString targetDatasetValue = new AString(connectionId.getDatasetName());
-            AString tupleValue = new AString(tuple);
-            AString messageValue = new AString(message);
-            AString dateTime = new AString(new Date().toString());
-
-            IAObject[] fields = new IAObject[] { id, dataverseValue, feedValue, targetDatasetValue, tupleValue,
-                    messageValue, dateTime };
-            ARecord record = new ARecord(recordType, fields);
-            StringBuilder builder = new StringBuilder();
-            builder.append("use dataverse " + FeedConstants.FEEDS_METADATA_DV + ";" + "\n");
-            builder.append("insert into dataset " + FeedConstants.FAILED_TUPLE_DATASET + " ");
-            builder.append(" (" + recordToString(record) + ")");
-            builder.append(";");
-
-            XAQLFeedMessage xAqlMessage = new XAQLFeedMessage(connectionId, builder.toString());
-            feedManager.getFeedMessageService().sendMessage(xAqlMessage);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(" Sent " + xAqlMessage.toJSON());
-            }
-        } catch (Exception pe) {
-            throw new AsterixException(pe);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "FeedMetadataManager [" + nodeId + "]";
-    }
-
-    private String recordToString(ARecord record) {
-        String[] fieldNames = record.getType().getFieldNames();
-        StringBuilder sb = new StringBuilder();
-        sb.append("{ ");
-        for (int i = 0; i < fieldNames.length; i++) {
-            if (i > 0) {
-                sb.append(", ");
-            }
-            sb.append("\"" + fieldNames[i] + "\"");
-            sb.append(": ");
-            switch (record.getType().getFieldTypes()[i].getTypeTag()) {
-                case STRING:
-                    sb.append("\"" + ((AString) record.getValueByPos(i)).getStringValue() + "\"");
-                    break;
-                default:
-                    break;
-            }
-        }
-        sb.append(" }");
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedSubscriptionManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedSubscriptionManager.java
deleted file mode 100644
index e402f92..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedSubscriptionManager.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-
-public class FeedSubscriptionManager implements IFeedSubscriptionManager {
-
-    private static Logger LOGGER = Logger.getLogger(FeedSubscriptionManager.class.getName());
-
-    private final String nodeId;
-
-    private final Map<SubscribableFeedRuntimeId, ISubscribableRuntime> subscribableRuntimes;
-
-    public FeedSubscriptionManager(String nodeId) {
-        this.nodeId = nodeId;
-        this.subscribableRuntimes = new HashMap<SubscribableFeedRuntimeId, ISubscribableRuntime>();
-    }
-
-    @Override
-    public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime) {
-        SubscribableFeedRuntimeId sid = (SubscribableFeedRuntimeId) subscribableRuntime.getRuntimeId();
-        if (!subscribableRuntimes.containsKey(sid)) {
-            subscribableRuntimes.put(sid, subscribableRuntime);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Registered feed subscribable runtime " + subscribableRuntime);
-            }
-        } else {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Feed ingestion runtime " + subscribableRuntime + " already registered.");
-            }
-        }
-    }
-
-    @Override
-    public ISubscribableRuntime getSubscribableRuntime(SubscribableFeedRuntimeId subscribableFeedRuntimeId) {
-        return subscribableRuntimes.get(subscribableFeedRuntimeId);
-    }
-
-    @Override
-    public void deregisterFeedSubscribableRuntime(SubscribableFeedRuntimeId ingestionId) {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("De-registered feed subscribable runtime " + ingestionId);
-        }
-        subscribableRuntimes.remove(ingestionId);
-    }
-
-    @Override
-    public String toString() {
-        return "IngestionManager [" + nodeId + "]";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedCongestionMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedCongestionMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedCongestionMessage.java
deleted file mode 100644
index 6c924d2..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedCongestionMessage.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.external.util.FeedConstants.MessageConstants;
-
-public class FeedCongestionMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-    private final FeedRuntimeId runtimeId;
-    private int inflowRate;
-    private int outflowRate;
-    private Mode mode;
-
-    public FeedCongestionMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId, int inflowRate,
-            int outflowRate, Mode mode) {
-        super(MessageType.CONGESTION);
-        this.connectionId = connectionId;
-        this.runtimeId = runtimeId;
-        this.inflowRate = inflowRate;
-        this.outflowRate = outflowRate;
-        this.mode = mode;
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
-        obj.put(FeedConstants.MessageConstants.OPERAND_ID, runtimeId.getOperandId());
-        obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
-        obj.put(FeedConstants.MessageConstants.INFLOW_RATE, inflowRate);
-        obj.put(FeedConstants.MessageConstants.OUTFLOW_RATE, outflowRate);
-        obj.put(FeedConstants.MessageConstants.MODE, mode);
-        return obj;
-    }
-
-    public FeedRuntimeId getRuntimeId() {
-        return runtimeId;
-    }
-
-    public int getInflowRate() {
-        return inflowRate;
-    }
-
-    public int getOutflowRate() {
-        return outflowRate;
-    }
-
-    public static FeedCongestionMessage read(JSONObject obj) throws JSONException {
-        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
-                obj.getString(FeedConstants.MessageConstants.FEED));
-        FeedConnectionId connectionId = new FeedConnectionId(feedId,
-                obj.getString(FeedConstants.MessageConstants.DATASET));
-        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
-                .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
-                obj.getInt(FeedConstants.MessageConstants.PARTITION),
-                obj.getString(FeedConstants.MessageConstants.OPERAND_ID));
-        Mode mode = Mode.valueOf(obj.getString(MessageConstants.MODE));
-        return new FeedCongestionMessage(connectionId, runtimeId,
-                obj.getInt(FeedConstants.MessageConstants.INFLOW_RATE),
-                obj.getInt(FeedConstants.MessageConstants.OUTFLOW_RATE), mode);
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public Mode getMode() {
-        return mode;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageService.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageService.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageService.java
deleted file mode 100644
index 13d6622..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageService.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import java.net.Socket;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.external.feed.api.IFeedMessage;
-import org.apache.asterix.external.feed.api.IFeedMessageService;
-import org.apache.asterix.external.util.FeedConstants;
-
-/**
- * Sends feed report messages on behalf of an operator instance
- * to the SuperFeedManager associated with the feed.
- */
-public class FeedMessageService implements IFeedMessageService {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
-
-    private final LinkedBlockingQueue<String> inbox;
-    private final FeedMessageHandler mesgHandler;
-    private final String nodeId;
-    private ExecutorService executor;
-
-    public FeedMessageService(AsterixFeedProperties feedProperties, String nodeId, String ccClusterIp) {
-        this.inbox = new LinkedBlockingQueue<String>();
-        this.mesgHandler = new FeedMessageHandler(inbox, ccClusterIp, feedProperties.getFeedCentralManagerPort());
-        this.nodeId = nodeId;
-        this.executor = Executors.newSingleThreadExecutor();
-    }
-
-    public void start() throws Exception {
-
-        executor.execute(mesgHandler);
-    }
-
-    public void stop() {
-        synchronized (mesgHandler.getLock()) {
-            executor.shutdownNow();
-        }
-        mesgHandler.stop();
-    }
-
-    @Override
-    public void sendMessage(IFeedMessage message) {
-        try {
-            JSONObject obj = message.toJSON();
-            obj.put(FeedConstants.MessageConstants.NODE_ID, nodeId);
-            obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, message.getMessageType().name());
-            inbox.add(obj.toString());
-        } catch (JSONException jse) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("JSON exception in parsing message " + message + " exception [" + jse.getMessage() + "]");
-            }
-        }
-    }
-
-    private static class FeedMessageHandler implements Runnable {
-
-        private final LinkedBlockingQueue<String> inbox;
-        private final String host;
-        private final int port;
-        private final Object lock;
-
-        private Socket cfmSocket;
-
-        private static final byte[] EOL = "\n".getBytes();
-
-        public FeedMessageHandler(LinkedBlockingQueue<String> inbox, String host, int port) {
-            this.inbox = inbox;
-            this.host = host;
-            this.port = port;
-            this.lock = new Object();
-        }
-
-        public void run() {
-            try {
-                cfmSocket = new Socket(host, port);
-                if (cfmSocket != null) {
-                    while (true) {
-                        String message = inbox.take();
-                        synchronized (lock) { // lock prevents message handler from sending incomplete message midst shutdown attempt
-                            cfmSocket.getOutputStream().write(message.getBytes());
-                            cfmSocket.getOutputStream().write(EOL);
-                        }
-                    }
-                } else {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Unable to start feed message service");
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
-                }
-            } finally {
-                stop();
-            }
-
-        }
-
-        public void stop() {
-            if (cfmSocket != null) {
-                try {
-                    cfmSocket.close();
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Exception in closing socket " + e.getMessage());
-                    }
-                }
-            }
-        }
-
-        public Object getLock() {
-            return lock;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedReportMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedReportMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedReportMessage.java
deleted file mode 100644
index 1b8c45d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedReportMessage.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.external.feed.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.api.IFeedMetricCollector.ValueType;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.asterix.external.util.FeedConstants;
-import org.apache.asterix.external.util.FeedConstants.MessageConstants;
-
-public class FeedReportMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-    private final FeedRuntimeId runtimeId;
-    private final ValueType valueType;
-    private int value;
-
-    public FeedReportMessage(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType, int value) {
-        super(MessageType.FEED_REPORT);
-        this.connectionId = connectionId;
-        this.runtimeId = runtimeId;
-        this.valueType = valueType;
-        this.value = value;
-    }
-
-    public void reset(int value) {
-        this.value = value;
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        obj.put(FeedConstants.MessageConstants.RUNTIME_TYPE, runtimeId.getFeedRuntimeType());
-        obj.put(FeedConstants.MessageConstants.PARTITION, runtimeId.getPartition());
-        obj.put(FeedConstants.MessageConstants.VALUE_TYPE, valueType);
-        obj.put(FeedConstants.MessageConstants.VALUE, value);
-        return obj;
-    }
-
-    public static FeedReportMessage read(JSONObject obj) throws JSONException {
-        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
-                obj.getString(FeedConstants.MessageConstants.FEED));
-        FeedConnectionId connectionId = new FeedConnectionId(feedId,
-                obj.getString(FeedConstants.MessageConstants.DATASET));
-        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.valueOf(obj
-                .getString(FeedConstants.MessageConstants.RUNTIME_TYPE)),
-                obj.getInt(FeedConstants.MessageConstants.PARTITION), FeedConstants.MessageConstants.NOT_APPLICABLE);
-        ValueType type = ValueType.valueOf(obj.getString(MessageConstants.VALUE_TYPE));
-        int value = Integer.parseInt(obj.getString(MessageConstants.VALUE));
-        return new FeedReportMessage(connectionId, runtimeId, type, value);
-    }
-
-    public int getValue() {
-        return value;
-    }
-
-    public void setValue(int value) {
-        this.value = value;
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public FeedRuntimeId getRuntimeId() {
-        return runtimeId;
-    }
-
-    public ValueType getValueType() {
-        return valueType;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitAckMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitAckMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitAckMessage.java
deleted file mode 100644
index 61e26de..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitAckMessage.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import javax.xml.bind.DatatypeConverter;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.util.FeedConstants;
-
-public class FeedTupleCommitAckMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-    private int intakePartition;
-    private int base;
-    private byte[] commitAcks;
-
-    public FeedTupleCommitAckMessage(FeedConnectionId connectionId, int intakePartition, int base, byte[] commitAcks) {
-        super(MessageType.COMMIT_ACK);
-        this.connectionId = connectionId;
-        this.intakePartition = intakePartition;
-        this.base = base;
-        this.commitAcks = commitAcks;
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
-        obj.put(FeedConstants.MessageConstants.BASE, base);
-        String commitAcksString = DatatypeConverter.printBase64Binary(commitAcks);
-        obj.put(FeedConstants.MessageConstants.COMMIT_ACKS, commitAcksString);
-        return obj;
-    }
-
-    public static FeedTupleCommitAckMessage read(JSONObject obj) throws JSONException {
-        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
-                obj.getString(FeedConstants.MessageConstants.FEED));
-        FeedConnectionId connectionId = new FeedConnectionId(feedId,
-                obj.getString(FeedConstants.MessageConstants.DATASET));
-        int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
-        int base = obj.getInt(FeedConstants.MessageConstants.BASE);
-        String commitAcksString = obj.getString(FeedConstants.MessageConstants.COMMIT_ACKS);
-        byte[] commitAcks = DatatypeConverter.parseBase64Binary(commitAcksString);
-        return new FeedTupleCommitAckMessage(connectionId, intakePartition, base, commitAcks);
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public int getIntakePartition() {
-        return intakePartition;
-    }
-
-    public byte[] getCommitAcks() {
-        return commitAcks;
-    }
-
-    public void reset(int intakePartition, int base, byte[] commitAcks) {
-        this.intakePartition = intakePartition;
-        this.base = base;
-        this.commitAcks = commitAcks;
-    }
-
-    public int getBase() {
-        return base;
-    }
-
-    public void setBase(int base) {
-        this.base = base;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitResponseMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitResponseMessage.java
deleted file mode 100644
index a61dc06..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedTupleCommitResponseMessage.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.util.FeedConstants;
-
-public class FeedTupleCommitResponseMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-    private final int intakePartition;
-    private final int maxWindowAcked;
-
-    public FeedTupleCommitResponseMessage(FeedConnectionId connectionId, int intakePartition, int maxWindowAcked) {
-        super(MessageType.COMMIT_ACK_RESPONSE);
-        this.connectionId = connectionId;
-        this.intakePartition = intakePartition;
-        this.maxWindowAcked = maxWindowAcked;
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        obj.put(FeedConstants.MessageConstants.INTAKE_PARTITION, intakePartition);
-        obj.put(FeedConstants.MessageConstants.MAX_WINDOW_ACKED, maxWindowAcked);
-        return obj;
-    }
-
-    @Override
-    public String toString() {
-        return connectionId + "[" + intakePartition + "]" + "(" + maxWindowAcked + ")";
-    }
-
-    public static FeedTupleCommitResponseMessage read(JSONObject obj) throws JSONException {
-        FeedId feedId = new FeedId(obj.getString(FeedConstants.MessageConstants.DATAVERSE),
-                obj.getString(FeedConstants.MessageConstants.FEED));
-        FeedConnectionId connectionId = new FeedConnectionId(feedId,
-                obj.getString(FeedConstants.MessageConstants.DATASET));
-        int intakePartition = obj.getInt(FeedConstants.MessageConstants.INTAKE_PARTITION);
-        int maxWindowAcked = obj.getInt(FeedConstants.MessageConstants.MAX_WINDOW_ACKED);
-        return new FeedTupleCommitResponseMessage(connectionId, intakePartition, maxWindowAcked);
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public int getMaxWindowAcked() {
-        return maxWindowAcked;
-    }
-
-    public int getIntakePartition() {
-        return intakePartition;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageListener.java
deleted file mode 100644
index 67f2884..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageListener.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.nio.CharBuffer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class MessageListener {
-
-    private static final Logger LOGGER = Logger.getLogger(MessageListener.class.getName());
-
-    private int port;
-    private final LinkedBlockingQueue<String> outbox;
-
-    private ExecutorService executorService = Executors.newFixedThreadPool(10);
-
-    private MessageListenerServer listenerServer;
-
-    public MessageListener(int port, LinkedBlockingQueue<String> outbox) {
-        this.port = port;
-        this.outbox = outbox;
-    }
-
-    public void stop() {
-        listenerServer.stop();
-        if (!executorService.isShutdown()) {
-            executorService.shutdownNow();
-        }
-    }
-
-    public void start() throws IOException {
-        listenerServer = new MessageListenerServer(port, outbox);
-        executorService.execute(listenerServer);
-    }
-
-    private static class MessageListenerServer implements Runnable {
-
-        private final int port;
-        private final LinkedBlockingQueue<String> outbox;
-        private ServerSocket server;
-
-        private static final char EOL = (char) "\n".getBytes()[0];
-
-        public MessageListenerServer(int port, LinkedBlockingQueue<String> outbox) {
-            this.port = port;
-            this.outbox = outbox;
-        }
-
-        public void stop() {
-            try {
-                server.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        public void run() {
-            Socket client = null;
-            try {
-                server = new ServerSocket(port);
-                client = server.accept();
-                InputStream in = client.getInputStream();
-                CharBuffer buffer = CharBuffer.allocate(5000);
-                char ch;
-                while (true) {
-                    ch = (char) in.read();
-                    if (((int) ch) == -1) {
-                        break;
-                    }
-                    while (ch != EOL) {
-                        buffer.put(ch);
-                        ch = (char) in.read();
-                    }
-                    buffer.flip();
-                    String s = new String(buffer.array());
-                    synchronized (outbox) {
-                        outbox.add(s + "\n");
-                    }
-                    buffer.position(0);
-                    buffer.limit(5000);
-                }
-
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unable to start Message listener" + server);
-                }
-            } finally {
-                if (server != null) {
-                    try {
-                        server.close();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
deleted file mode 100644
index 6ed176a..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IMessageReceiver;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public abstract class MessageReceiver<T> implements IMessageReceiver<T> {
-
-    protected static final Logger LOGGER = Logger.getLogger(MessageReceiver.class.getName());
-
-    protected final ArrayBlockingQueue<T> inbox;
-    protected ExecutorService executor;
-
-    public MessageReceiver() {
-        inbox = new ArrayBlockingQueue<T>(2);
-    }
-
-    public abstract void processMessage(T message) throws Exception;
-
-    @Override
-    public void start() {
-        executor = Executors.newSingleThreadExecutor();
-        executor.execute(new MessageReceiverRunnable<T>(this));
-    }
-
-    @Override
-    public synchronized void sendMessage(T message) throws InterruptedException {
-        inbox.put(message);
-    }
-
-    @Override
-    public void close(boolean processPending) {
-        if (executor != null) {
-            executor.shutdown();
-            executor = null;
-            if (processPending) {
-                flushPendingMessages();
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Will discard the pending frames " + inbox.size());
-                }
-            }
-        }
-    }
-
-    private static class MessageReceiverRunnable<T> implements Runnable {
-
-        private final ArrayBlockingQueue<T> inbox;
-        private final MessageReceiver<T> messageReceiver;
-
-        public MessageReceiverRunnable(MessageReceiver<T> messageReceiver) {
-            this.inbox = messageReceiver.inbox;
-            this.messageReceiver = messageReceiver;
-        }
-        // TODO: this should handle exceptions better
-
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    T message = inbox.poll();
-                    if (message == null) {
-                        messageReceiver.emptyInbox();
-                        message = inbox.take();
-                    }
-                    messageReceiver.processMessage(message);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    protected void flushPendingMessages() {
-        while (!inbox.isEmpty()) {
-            T message = null;
-            try {
-                message = inbox.take();
-                processMessage(message);
-            } catch (InterruptedException ie) {
-                // ignore exception but break from the loop
-                break;
-            } catch (Exception e) {
-                e.printStackTrace();
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Exception " + e + " in processing message " + message);
-                }
-            }
-        }
-    }
-
-    public abstract void emptyInbox() throws HyracksDataException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/NodeReportMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/NodeReportMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/NodeReportMessage.java
deleted file mode 100644
index 1548d6d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/NodeReportMessage.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.api.IFeedMessage;
-import org.apache.asterix.external.util.FeedConstants;
-
-public class NodeReportMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private double cpuLoad;
-    private double usedHeap;
-    private int nRuntimes;
-
-    public NodeReportMessage(float cpuLoad, long usedHeap, int nRuntimes) {
-        super(IFeedMessage.MessageType.NODE_REPORT);
-        this.usedHeap = usedHeap;
-        this.cpuLoad = cpuLoad;
-        this.nRuntimes = nRuntimes;
-    }
-
-    public void reset(double cpuLoad, double usedHeap, int nRuntimes) {
-        this.cpuLoad = cpuLoad;
-        this.usedHeap = usedHeap;
-        this.nRuntimes = nRuntimes;
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.CPU_LOAD, cpuLoad);
-        obj.put(FeedConstants.MessageConstants.HEAP_USAGE, usedHeap);
-        obj.put(FeedConstants.MessageConstants.N_RUNTIMES, nRuntimes);
-        return obj;
-    }
-
-    public double getCpuLoad() {
-        return cpuLoad;
-    }
-
-    public double getUsedHeap() {
-        return usedHeap;
-    }
-
-    public int getnRuntimes() {
-        return nRuntimes;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/PrepareStallMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/PrepareStallMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/PrepareStallMessage.java
deleted file mode 100644
index 76fe0c2..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/PrepareStallMessage.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.util.FeedConstants;
-
-/**
- * A feed control message indicating the need to end the feed. This message is dispatched
- * to all locations that host an operator involved in the feed pipeline.
- */
-public class PrepareStallMessage extends FeedMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    private final FeedConnectionId connectionId;
-
-    private final int computePartitionsRetainLimit;
-
-    public PrepareStallMessage(FeedConnectionId connectionId, int computePartitionsRetainLimit) {
-        super(MessageType.PREPARE_STALL);
-        this.connectionId = connectionId;
-        this.computePartitionsRetainLimit = computePartitionsRetainLimit;
-    }
-
-    @Override
-    public String toString() {
-        return MessageType.PREPARE_STALL.name() + "  " + connectionId;
-    }
-
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject obj = new JSONObject();
-        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
-        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
-        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
-        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
-        obj.put(FeedConstants.MessageConstants.COMPUTE_PARTITION_RETAIN_LIMIT, computePartitionsRetainLimit);
-        return obj;
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public int getComputePartitionsRetainLimit() {
-        return computePartitionsRetainLimit;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
deleted file mode 100644
index 0749f82..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/RemoteSocketMessageListener.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.message;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.nio.CharBuffer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class RemoteSocketMessageListener {
-
-    private static final Logger LOGGER = Logger.getLogger(RemoteSocketMessageListener.class.getName());
-
-    private final String host;
-    private final int port;
-    private final LinkedBlockingQueue<String> outbox;
-    private final ExecutorService executorService = Executors.newFixedThreadPool(10);
-
-    private RemoteMessageListenerServer listenerServer;
-
-    public RemoteSocketMessageListener(String host, int port, LinkedBlockingQueue<String> outbox) {
-        this.host = host;
-        this.port = port;
-        this.outbox = outbox;
-    }
-
-    public void stop() {
-        if (!executorService.isShutdown()) {
-            executorService.shutdownNow();
-        }
-        listenerServer.stop();
-
-    }
-
-    public void start() throws IOException {
-        listenerServer = new RemoteMessageListenerServer(host, port, outbox);
-        executorService.execute(listenerServer);
-    }
-
-    private static class RemoteMessageListenerServer implements Runnable {
-
-        private final String host;
-        private final int port;
-        private final LinkedBlockingQueue<String> outbox;
-        private Socket client;
-
-        public RemoteMessageListenerServer(String host, int port, LinkedBlockingQueue<String> outbox) {
-            this.host = host;
-            this.port = port;
-            this.outbox = outbox;
-        }
-
-        public void stop() {
-            try {
-                client.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        public void run() {
-            char EOL = (char) "\n".getBytes()[0];
-            Socket client = null;
-            try {
-                client = new Socket(host, port);
-                InputStream in = client.getInputStream();
-                CharBuffer buffer = CharBuffer.allocate(5000);
-                char ch;
-                while (true) {
-                    ch = (char) in.read();
-                    if ((ch) == -1) {
-                        break;
-                    }
-                    while (ch != EOL) {
-                        buffer.put(ch);
-                        ch = (char) in.read();
-                    }
-                    buffer.flip();
-                    String s = new String(buffer.array());
-                    synchronized (outbox) {
-                        outbox.add(s + "\n");
-                    }
-                    buffer.position(0);
-                    buffer.limit(5000);
-                }
-
-            } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unable to start Remote Message listener" + client);
-                }
-            } finally {
-                if (client != null && !client.isClosed()) {
-                    try {
-                        client.close();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-        }
-    }
-
-    public static interface IMessageAnalyzer {
-
-        /**
-         * @return
-         */
-        public LinkedBlockingQueue<String> getMessageQueue();
-
-    }
-
-}