You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/05/15 19:04:00 UTC

[7/9] incubator-asterixdb git commit: Cleanup Feed CodeBase

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java
deleted file mode 100644
index c35587c..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedMetricCollector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-
-public interface IFeedMetricCollector {
-
-    public enum ValueType {
-        CPU_USAGE,
-        INFLOW_RATE,
-        OUTFLOW_RATE
-    }
-
-    public enum MetricType {
-        AVG,
-        RATE
-    }
-
-    public boolean sendReport(int senderId, int value);
-
-    public int getMetric(int senderId);
-
-    public int getMetric(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType);
-
-    int createReportSender(FeedConnectionId connectionId, FeedRuntimeId runtimeId, ValueType valueType,
-            MetricType metricType);
-
-    public void removeReportSender(int senderId);
-
-    public void resetReportSender(int senderId);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java
deleted file mode 100644
index a8d0552..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedOperatorOutputSideHandler.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.hyracks.api.comm.IFrameWriter;
-
-/**
- * Provides for output-side buffering for a feed runtime.
- * Right now, we only have a single output side handler
- * and we can probably remove it completely.
- *              ______
- *             |      |
- * ============|core  |============
- * ============| op   |============
- *             |______|^^^^^^^^^^^^
- *                     Output Side
- *                       Handler
- *
- **/
-public interface IFeedOperatorOutputSideHandler extends IFrameWriter {
-
-    public enum Type {
-        BASIC_FEED_OUTPUT_HANDLER,
-        DISTRIBUTE_FEED_OUTPUT_HANDLER,
-        COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER
-    }
-
-    public FeedId getFeedId();
-
-    public Type getType();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java
deleted file mode 100644
index 9eced07..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedProvider.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.management.FeedId;
-
-public interface IFeedProvider {
-
-    public void subscribeFeed(FeedId sourceDeedId);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
index 269725d..b8375e3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedRuntime.java
@@ -18,9 +18,7 @@
  */
 package org.apache.asterix.external.feed.api;
 
-import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.hyracks.api.comm.IFrameWriter;
 
 public interface IFeedRuntime {
 
@@ -36,27 +34,14 @@ public interface IFeedRuntime {
     }
 
     public enum Mode {
-        PROCESS,
-        SPILL,
-        PROCESS_SPILL,
-        DISCARD,
-        POST_SPILL_DISCARD,
-        PROCESS_BACKLOG,
-        STALL,
-        FAIL,
-        END
+        PROCESS,            // There is memory
+        SPILL,              // Memory budget has been consumed. Now we're writing to disk
+        DISCARD             // Memory budget has been consumed. Disk space budget has been consumed. Now we're
+                            // discarding
     }
 
     /**
      * @return the unique runtime id associated with the feedRuntime
      */
     public FeedRuntimeId getRuntimeId();
-
-    /**
-     * @return the frame writer associated with the feed runtime.
-     */
-    public IFrameWriter getFeedFrameWriter();
-
-    public FeedRuntimeInputHandler getInputHandler();
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java
deleted file mode 100644
index 3d3e0e5..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedService.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-public interface IFeedService {
-
-    public void start() throws Exception;
-
-    public void stop();
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java
deleted file mode 100644
index ec4c396..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedSubscriptionManager.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-
-public interface IFeedSubscriptionManager {
-
-    /**
-     * @param subscribableRuntime
-     */
-    public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime);
-
-    /**
-     * @param subscribableRuntimeId
-     */
-    public void deregisterFeedSubscribableRuntime(SubscribableFeedRuntimeId subscribableRuntimeId);
-
-    /**
-     * @param subscribableRuntimeId
-     * @return
-     */
-    public ISubscribableRuntime getSubscribableRuntime(SubscribableFeedRuntimeId subscribableRuntimeId);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java
deleted file mode 100644
index 6576e09..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedTrackingManager.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.FeedTupleCommitAckMessage;
-
-public interface IFeedTrackingManager {
-
-    public void submitAckReport(FeedTupleCommitAckMessage ackMessage);
-
-    public void disableAcking(FeedConnectionId connectionId);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java
deleted file mode 100644
index 647d847..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFrameEventCallback.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-public interface IFrameEventCallback {
-
-    public enum FrameEvent {
-        FINISHED_PROCESSING,
-        PENDING_WORK_THRESHOLD_REACHED,
-        PENDING_WORK_DONE,
-        NO_OP,
-        FINISHED_PROCESSING_SPILLAGE
-    }
-
-    public void frameEvent(FrameEvent frameEvent);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java
deleted file mode 100644
index eab7a64..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePostProcessor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public interface IFramePostProcessor {
-
-    public void postProcessFrame(ByteBuffer frame, FrameTupleAccessor frameAccessor);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java
deleted file mode 100644
index 55461b7..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFramePreprocessor.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.nio.ByteBuffer;
-
-public interface IFramePreprocessor {
-
-    public void preProcess(ByteBuffer frame) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java
deleted file mode 100644
index 4848ed8..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IIntakeProgressTracker.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import java.util.Map;
-
-public interface IIntakeProgressTracker {
-
-    public void configure(Map<String, String> configuration);
-
-    public void notifyIngestedTupleTimestamp(long timestamp);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java
deleted file mode 100644
index e58d99e..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IMessageReceiver.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-public interface IMessageReceiver<T> {
-
-    public void sendMessage(T message) throws InterruptedException;
-
-    public void close(boolean processPending);
-
-    public void start();
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
index ee07188..1ca46ce 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscribableRuntime.java
@@ -18,12 +18,8 @@
  */
 package org.apache.asterix.external.feed.api;
 
-import java.util.List;
-
-import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Represent a feed runtime whose output can be routed along other parallel path(s).
@@ -34,28 +30,12 @@ public interface ISubscribableRuntime extends IFeedRuntime {
      * @param collectionRuntime
      * @throws Exception
      */
-    public void subscribeFeed(FeedPolicyAccessor fpa, CollectionRuntime collectionRuntime) throws Exception;
+    public void subscribe(CollectionRuntime collectionRuntime) throws HyracksDataException;
 
     /**
      * @param collectionRuntime
+     * @throws InterruptedException
      * @throws Exception
      */
-    public void unsubscribeFeed(CollectionRuntime collectionRuntime) throws Exception;
-
-    /**
-     * @return
-     * @throws Exception
-     */
-    public List<ISubscriberRuntime> getSubscribers();
-
-    /**
-     * @return
-     */
-    public DistributeFeedFrameWriter getFeedFrameWriter();
-
-    /**
-     * @return
-     */
-    public RecordDescriptor getRecordDescriptor();
-
+    public void unsubscribe(CollectionRuntime collectionRuntime) throws HyracksDataException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
index 4d3e607..1752054 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriberRuntime.java
@@ -20,11 +20,8 @@ package org.apache.asterix.external.feed.api;
 
 import java.util.Map;
 
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
-
 public interface ISubscriberRuntime {
 
     public Map<String, String> getFeedPolicy();
 
-    public FeedFrameCollector getFrameCollector();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java
deleted file mode 100644
index b94a52e..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/ISubscriptionProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.external.feed.management.FeedId;
-
-public interface ISubscriptionProvider {
-
-    public void subscribeFeed(FeedId sourceFeedId, FeedId recipientFeedId);
-
-    public void unsubscribeFeed(FeedId sourceFeedId, FeedId recipientFeedId);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
deleted file mode 100644
index c4d5014..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-// Simply a delivery frame writer. I think we can simply get rid of this at some point {TODO}.
-public class CollectTransformFeedFrameWriter implements IFeedOperatorOutputSideHandler {
-
-    private final FeedConnectionId connectionId;                // [Dataverse - Feed - Dataset]
-    private IFrameWriter downstreamWriter;                      // Writer to next (Operator/Connector)
-    private final FrameTupleAccessor inputFrameTupleAccessor;   // Accessing input frame tuples
-    private final FrameTupleAppender tupleAppender;             // Append tuples to output frame
-    private final IFrame frame;                                 // Output frame
-
-    private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
-
-    public CollectTransformFeedFrameWriter(IHyracksTaskContext ctx, IFrameWriter downstreamWriter,
-            ISubscribableRuntime sourceRuntime, RecordDescriptor outputRecordDescriptor, FeedConnectionId connectionId)
-                    throws HyracksDataException {
-        this.connectionId = connectionId;
-        this.downstreamWriter = downstreamWriter;
-        inputFrameTupleAccessor = new FrameTupleAccessor(sourceRuntime.getRecordDescriptor());
-        frame = new VSizeFrame(ctx);
-        tupleAppender = new FrameTupleAppender(frame);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        downstreamWriter.open();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        // always project the first field only. why?
-        inputFrameTupleAccessor.reset(buffer);
-        int nTuple = inputFrameTupleAccessor.getTupleCount();
-        for (int t = 0; t < nTuple; t++) {
-            tupleBuilder.addField(inputFrameTupleAccessor, t, 0);
-            appendTupleToFrame();
-            tupleBuilder.reset();
-        }
-    }
-
-    private void appendTupleToFrame() throws HyracksDataException {
-        if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                tupleBuilder.getSize())) {
-            FrameUtils.flushFrame(frame.getBuffer(), downstreamWriter);
-            tupleAppender.reset(frame, true);
-            if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                    tupleBuilder.getSize())) {
-                throw new IllegalStateException();
-            }
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        downstreamWriter.fail();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        downstreamWriter.close();
-    }
-
-    @Override
-    public FeedId getFeedId() {
-        return connectionId.getFeedId();
-    }
-
-    @Override
-    public Type getType() {
-        return Type.COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER;
-    }
-
-    public IFrameWriter getDownstreamWriter() {
-        return downstreamWriter;
-    }
-
-    public FeedConnectionId getConnectionId() {
-        return connectionId;
-    }
-
-    public void reset(IFrameWriter writer) {
-        this.downstreamWriter = writer;
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-        tupleAppender.flush(downstreamWriter);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java
deleted file mode 100644
index 7f63d1d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucket.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * A {@DataBucket} is a wrapper around {@ByteBuffer} that expects certain number of receipients
- */
-public class DataBucket {
-
-    private static final AtomicInteger globalBucketId = new AtomicInteger(0);
-
-    private final ByteBuffer content;       // Content
-    private final AtomicInteger readCount;  // How many reads?
-    private final int bucketId;             // Id
-    private int desiredReadCount;           // Number of expected readers
-    private ContentType contentType;        // Data, End of stream, or End of spilled data
-    private final DataBucketPool pool;      // Pool of buckets
-
-    public enum ContentType {
-        DATA, // data (feed tuple)
-        EOD, // A signal indicating that there shall be no more data
-        EOSD // End of processing of spilled data
-    }
-
-    public DataBucket(DataBucketPool pool) {
-        this.content = ByteBuffer.allocate(pool.getFrameSize());
-        this.readCount = new AtomicInteger(0);
-        this.pool = pool;
-        this.contentType = ContentType.DATA;
-        this.bucketId = globalBucketId.incrementAndGet();
-    }
-
-    public synchronized void reset(ByteBuffer frame) {
-        if (frame != null) {
-            content.flip();
-            System.arraycopy(frame.array(), 0, content.array(), 0, frame.limit());
-            content.limit(frame.limit());
-            content.position(0);
-        }
-    }
-
-    public synchronized void doneReading() {
-        if (readCount.incrementAndGet() == desiredReadCount) {
-            readCount.set(0);
-            pool.returnDataBucket(this);
-        }
-    }
-
-    public void setDesiredReadCount(int rCount) {
-        this.desiredReadCount = rCount;
-    }
-
-    public ContentType getContentType() {
-        return contentType;
-    }
-
-    public void setContentType(ContentType contentType) {
-        this.contentType = contentType;
-    }
-
-    public synchronized ByteBuffer getContent() {
-        return content;
-    }
-
-    @Override
-    public String toString() {
-        return "DataBucket [" + bucketId + "]" + " (" + readCount + "," + desiredReadCount + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java
deleted file mode 100644
index d1dea51..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DataBucketPool.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.util.Stack;
-
-import org.apache.asterix.external.feed.api.IFeedMemoryComponent;
-import org.apache.asterix.external.feed.api.IFeedMemoryManager;
-
-/**
- * Represents a pool of reusable {@link DataBucket}
- */
-public class DataBucketPool implements IFeedMemoryComponent {
-
-    /** A unique identifier for the memory component **/
-    private final int componentId;
-
-    /** The {@link IFeedMemoryManager} for the NodeController **/
-    private final IFeedMemoryManager memoryManager;
-
-    /** A collection of available data buckets {@link DataBucket} **/
-    private final Stack<DataBucket> pool;
-
-    /** The total number of data buckets {@link DataBucket} allocated **/
-    private int totalAllocation;
-
-    /** The fixed frame size as configured for the asterix runtime **/
-    private final int frameSize;
-
-    public DataBucketPool(int componentId, IFeedMemoryManager memoryManager, int size, int frameSize) {
-        this.componentId = componentId;
-        this.memoryManager = memoryManager;
-        this.pool = new Stack<DataBucket>();
-        this.frameSize = frameSize;
-        expand(size);
-    }
-
-    public synchronized void returnDataBucket(DataBucket bucket) {
-        pool.push(bucket);
-    }
-
-    public synchronized DataBucket getDataBucket() {
-        if (pool.size() == 0) {
-            if (!memoryManager.expandMemoryComponent(this)) {
-                return null;
-            }
-        }
-        return pool.pop();
-    }
-
-    @Override
-    public Type getType() {
-        return Type.POOL;
-    }
-
-    @Override
-    public int getTotalAllocation() {
-        return totalAllocation;
-    }
-
-    @Override
-    public int getComponentId() {
-        return componentId;
-    }
-
-    @Override
-    public void expand(int delta) {
-        for (int i = 0; i < delta; i++) {
-            DataBucket bucket = new DataBucket(this);
-            pool.add(bucket);
-        }
-        totalAllocation += delta;
-    }
-
-    @Override
-    public void reset() {
-        totalAllocation -= pool.size();
-        pool.clear();
-    }
-
-    @Override
-    public String toString() {
-        return "DataBucketPool" + "[" + componentId + "]" + "(" + totalAllocation + ")";
-    }
-
-    public int getSize() {
-        return pool.size();
-    }
-
-    public int getFrameSize() {
-        return frameSize;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
index 8b7e2ba..f356899 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
@@ -20,17 +20,10 @@ package org.apache.asterix.external.feed.dataflow;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
-import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler.Type;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -44,8 +37,6 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
  **/
 public class DistributeFeedFrameWriter implements IFrameWriter {
 
-    private static final Logger LOGGER = Logger.getLogger(DistributeFeedFrameWriter.class.getName());
-
     /** A unique identifier for the feed to which the incoming tuples belong. **/
     private final FeedId feedId;
 
@@ -56,7 +47,7 @@ public class DistributeFeedFrameWriter implements IFrameWriter {
     private final FrameDistributor frameDistributor;
 
     /** The original frame writer instantiated as part of job creation **/
-    private IFrameWriter writer;
+    private final IFrameWriter writer;
 
     /** The feed operation whose output is being distributed by the DistributeFeedFrameWriter **/
     private final FeedRuntimeType feedRuntimeType;
@@ -65,11 +56,9 @@ public class DistributeFeedFrameWriter implements IFrameWriter {
     private final int partition;
 
     public DistributeFeedFrameWriter(IHyracksTaskContext ctx, FeedId feedId, IFrameWriter writer,
-            FeedRuntimeType feedRuntimeType, int partition, FrameTupleAccessor fta, IFeedManager feedManager)
-            throws IOException {
+            FeedRuntimeType feedRuntimeType, int partition, FrameTupleAccessor fta) throws IOException {
         this.feedId = feedId;
-        this.frameDistributor = new FrameDistributor(feedId, feedRuntimeType, partition, true,
-                feedManager.getFeedMemoryManager(), fta);
+        this.frameDistributor = new FrameDistributor();
         this.feedRuntimeType = feedRuntimeType;
         this.partition = partition;
         this.writer = writer;
@@ -78,38 +67,19 @@ public class DistributeFeedFrameWriter implements IFrameWriter {
     /**
      * @param fpa
      *            Feed policy accessor
-     * @param frameWriter
+     * @param nextOnlyWriter
      *            the writer which will deliver the buffers
      * @param connectionId
      *            (Dataverse - Dataset - Feed)
      * @return A frame collector.
-     * @throws Exception
+     * @throws HyracksDataException
      */
-    public FeedFrameCollector subscribeFeed(FeedPolicyAccessor fpa, IFrameWriter frameWriter,
-            FeedConnectionId connectionId) throws Exception {
-        FeedFrameCollector collector = null;
-        if (!frameDistributor.isRegistered(frameWriter)) {
-            collector = new FeedFrameCollector(frameDistributor, fpa, frameWriter, connectionId);
-            frameDistributor.registerFrameCollector(collector);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Registered subscriber, new mode " + frameDistributor.getMode());
-            }
-            return collector;
-        } else {
-            throw new IllegalStateException("subscriber " + feedId + " already registered");
-        }
+    public void subscribe(FeedFrameCollector collector) throws HyracksDataException {
+        frameDistributor.registerFrameCollector(collector);
     }
 
-    public void unsubscribeFeed(IFrameWriter recipientFeedFrameWriter) throws Exception {
-        boolean success = frameDistributor.deregisterFrameCollector(recipientFeedFrameWriter);
-        if (!success) {
-            throw new IllegalStateException(
-                    "Invalid attempt to unregister FeedFrameWriter " + recipientFeedFrameWriter + " not registered.");
-        }
-    }
-
-    public void notifyEndOfFeed() throws InterruptedException {
-        frameDistributor.notifyEndOfFeed();
+    public void unsubscribeFeed(FeedConnectionId connectionId) throws HyracksDataException {
+        frameDistributor.deregisterFrameCollector(connectionId);
     }
 
     @Override
@@ -136,27 +106,11 @@ public class DistributeFeedFrameWriter implements IFrameWriter {
         writer.open();
     }
 
-    public Map<IFrameWriter, FeedFrameCollector> getRegisteredReaders() {
-        return frameDistributor.getRegisteredReaders();
-    }
-
-    public void setWriter(IFrameWriter writer) {
-        this.writer = writer;
-    }
-
-    public Type getType() {
-        return IFeedOperatorOutputSideHandler.Type.DISTRIBUTE_FEED_OUTPUT_HANDLER;
-    }
-
     @Override
     public String toString() {
         return feedId.toString() + feedRuntimeType + "[" + partition + "]";
     }
 
-    public FrameDistributor.DistributionMode getDistributionMode() {
-        return frameDistributor.getDistributionMode();
-    }
-
     @Override
     public void flush() throws HyracksDataException {
         frameDistributor.flush();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java
deleted file mode 100644
index 447c4bc..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedCollectRuntimeInputHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FeedCollectRuntimeInputHandler extends FeedRuntimeInputHandler {
-
-    private final FeedFrameCache feedFrameCache;
-
-    public FeedCollectRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId,
-            FeedRuntimeId runtimeId, IFrameWriter coreOperator, FeedPolicyAccessor fpa, boolean bufferingEnabled,
-            FrameTupleAccessor fta, RecordDescriptor recordDesc, IFeedManager feedManager, int nPartitions)
-            throws IOException {
-        super(ctx, connectionId, runtimeId, coreOperator, fpa, bufferingEnabled, fta, recordDesc, feedManager,
-                nPartitions);
-        this.feedFrameCache = new FeedFrameCache(ctx, fta, coreOperator);
-    }
-
-    public void replayFrom(int recordId) throws HyracksDataException {
-        feedFrameCache.replayRecords(recordId);
-    }
-
-    public void dropTill(int recordId) {
-        feedFrameCache.dropTillRecordId(recordId);
-    }
-
-    public void replayCached() throws HyracksDataException {
-        feedFrameCache.replayAll();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
index f102f93..b98f123 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedExceptionHandler.java
@@ -24,12 +24,9 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.FrameDataException;
-import org.apache.asterix.external.feed.api.IExceptionHandler;
-import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.common.exceptions.IExceptionHandler;
 import org.apache.asterix.external.util.FeedFrameUtil;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
@@ -37,12 +34,11 @@ public class FeedExceptionHandler implements IExceptionHandler {
 
     private static Logger LOGGER = Logger.getLogger(FeedExceptionHandler.class.getName());
 
-    //TODO: Enable logging
+    // TODO: Enable logging
     private final IHyracksTaskContext ctx;
     private final FrameTupleAccessor fta;
 
-    public FeedExceptionHandler(IHyracksTaskContext ctx, FrameTupleAccessor fta, RecordDescriptor recordDesc,
-            IFeedManager feedManager, FeedConnectionId connectionId) {
+    public FeedExceptionHandler(IHyracksTaskContext ctx, FrameTupleAccessor fta) {
         this.ctx = ctx;
         this.fta = fta;
     }
@@ -53,20 +49,21 @@ public class FeedExceptionHandler implements IExceptionHandler {
     }
 
     @Override
-    public ByteBuffer handleException(Exception e, ByteBuffer frame) {
+    public ByteBuffer handle(HyracksDataException th, ByteBuffer frame) {
         try {
-            if (e instanceof FrameDataException) {
+            if (th instanceof FrameDataException) {
                 fta.reset(frame);
-                FrameDataException fde = (FrameDataException) e;
+                FrameDataException fde = (FrameDataException) th;
                 int tupleIndex = fde.getTupleIndex();
                 try {
-                    logExceptionCausingTuple(tupleIndex, e);
+                    logExceptionCausingTuple(tupleIndex, th);
                 } catch (Exception ex) {
-                    ex.addSuppressed(e);
+                    ex.addSuppressed(th);
                     if (LOGGER.isLoggable(Level.WARNING)) {
                         LOGGER.warning("Unable to log exception causing tuple due to..." + ex.getMessage());
                     }
                 }
+                // TODO: Improve removeBadTuple. Right now, it creates lots of objects
                 return FeedFrameUtil.removeBadTuple(ctx, tupleIndex, fta);
             } else {
                 return null;
@@ -80,6 +77,8 @@ public class FeedExceptionHandler implements IExceptionHandler {
         }
     }
 
-    private void logExceptionCausingTuple(int tupleIndex, Exception e) throws HyracksDataException, AsterixException {
+    // TODO: Fix logging of exceptions
+    private void logExceptionCausingTuple(int tupleIndex, Throwable e) throws HyracksDataException, AsterixException {
+        LOGGER.log(Level.WARNING, e.getMessage(), e);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
deleted file mode 100644
index 159bc43..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.external.feed.message.MessageReceiver;
-import org.apache.asterix.external.util.FeedConstants.StatisticsConstants;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-/**
- * Allows caching of feed frames. This class is used in providing upstream backup.
- * The tuples at the intake layer are held in this cache until these are acked by
- * the storage layer post their persistence. On receiving an ack, appropriate tuples
- * (recordsId < ackedRecordId) are dropped from the cache.
- */
-public class FeedFrameCache extends MessageReceiver<ByteBuffer> {
-
-    /**
-     * Value represents a cache feed frame
-     * Key represents the largest record Id in the frame.
-     * At the intake side, the largest record id corresponds to the last record in the frame
-     **/
-    private final Map<Integer, ByteBuffer> orderedCache;
-    private final FrameTupleAccessor tupleAccessor;
-    private final IFrameWriter frameWriter;
-    private final IHyracksTaskContext ctx;
-
-    public FeedFrameCache(IHyracksTaskContext ctx, FrameTupleAccessor tupleAccessor, IFrameWriter frameWriter) {
-        this.tupleAccessor = tupleAccessor;
-        this.frameWriter = frameWriter;
-        /** A LinkedHashMap ensures entries are retrieved in order of their insertion **/
-        this.orderedCache = new LinkedHashMap<Integer, ByteBuffer>();
-        this.ctx = ctx;
-    }
-
-    @Override
-    public void processMessage(ByteBuffer frame) throws Exception {
-        int lastRecordId = getLastRecordId(frame);
-        ByteBuffer clone = cloneFrame(frame);
-        orderedCache.put(lastRecordId, clone);
-    }
-
-    public void dropTillRecordId(int recordId) {
-        List<Integer> dropRecordIds = new ArrayList<Integer>();
-        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
-            int recId = entry.getKey();
-            if (recId <= recordId) {
-                dropRecordIds.add(recId);
-            } else {
-                break;
-            }
-        }
-        for (Integer r : dropRecordIds) {
-            orderedCache.remove(r);
-        }
-    }
-
-    public void replayRecords(int startingRecordId) throws HyracksDataException {
-        boolean replayPositionReached = false;
-        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
-            // the key increases monotonically
-            int maxRecordIdInFrame = entry.getKey();
-            if (!replayPositionReached) {
-                if (startingRecordId < maxRecordIdInFrame) {
-                    replayFrame(startingRecordId, entry.getValue());
-                    break;
-                } else {
-                    continue;
-                }
-            }
-        }
-    }
-
-    /**
-     * Replay the frame from the tuple (inclusive) with recordId as specified.
-     *
-     * @param recordId
-     * @param frame
-     * @throws HyracksDataException
-     */
-    private void replayFrame(int recordId, ByteBuffer frame) throws HyracksDataException {
-        tupleAccessor.reset(frame);
-        int nTuples = tupleAccessor.getTupleCount();
-        for (int i = 0; i < nTuples; i++) {
-            int rid = getRecordIdAtTupleIndex(i, frame);
-            if (rid == recordId) {
-                ByteBuffer slicedFrame = splitFrame(i, frame);
-                replayFrame(slicedFrame);
-                break;
-            }
-        }
-    }
-
-    private ByteBuffer splitFrame(int beginTupleIndex, ByteBuffer frame) throws HyracksDataException {
-        IFrame slicedFrame = new VSizeFrame(ctx);
-        FrameTupleAppender appender = new FrameTupleAppender();
-        appender.reset(slicedFrame, true);
-        int totalTuples = tupleAccessor.getTupleCount();
-        for (int ti = beginTupleIndex; ti < totalTuples; ti++) {
-            appender.append(tupleAccessor, ti);
-        }
-        return slicedFrame.getBuffer();
-    }
-
-    /**
-     * Replay the frame
-     *
-     * @param frame
-     * @throws HyracksDataException
-     */
-    private void replayFrame(ByteBuffer frame) throws HyracksDataException {
-        frameWriter.nextFrame(frame);
-    }
-
-    private int getLastRecordId(ByteBuffer frame) {
-        tupleAccessor.reset(frame);
-        int nTuples = tupleAccessor.getTupleCount();
-        return getRecordIdAtTupleIndex(nTuples - 1, frame);
-    }
-
-    private int getRecordIdAtTupleIndex(int tupleIndex, ByteBuffer frame) {
-        tupleAccessor.reset(frame);
-        int recordStart = tupleAccessor.getTupleStartOffset(tupleIndex) + tupleAccessor.getFieldSlotsLength();
-        int openPartOffset = frame.getInt(recordStart + 6);
-        int numOpenFields = frame.getInt(recordStart + openPartOffset);
-        int recordIdOffset = frame.getInt(recordStart + openPartOffset + 4 + numOpenFields * 8
-                + StatisticsConstants.INTAKE_TUPLEID.length() + 2 + 1);
-        int lastRecordId = frame.getInt(recordStart + recordIdOffset);
-        return lastRecordId;
-    }
-
-    private ByteBuffer cloneFrame(ByteBuffer frame) {
-        ByteBuffer clone = ByteBuffer.allocate(frame.capacity());
-        System.arraycopy(frame.array(), 0, clone.array(), 0, frame.limit());
-        return clone;
-    }
-
-    public void replayAll() throws HyracksDataException {
-        for (Entry<Integer, ByteBuffer> entry : orderedCache.entrySet()) {
-            ByteBuffer frame = entry.getValue();
-            frameWriter.nextFrame(frame);
-        }
-    }
-
-    @Override
-    public void emptyInbox() throws HyracksDataException {
-        frameWriter.flush();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
index ef4b87d..b81f59a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
@@ -19,21 +19,16 @@
 package org.apache.asterix.external.feed.dataflow;
 
 import java.nio.ByteBuffer;
-import java.util.logging.Level;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.message.MessageReceiver;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class FeedFrameCollector extends MessageReceiver<DataBucket> {
+public class FeedFrameCollector implements IFrameWriter {
 
     private final FeedConnectionId connectionId;
-    private final FrameDistributor frameDistributor;
-    private FeedPolicyAccessor fpa;
-    private IFrameWriter frameWriter;
+    private IFrameWriter writer;
     private State state;
 
     public enum State {
@@ -43,70 +38,27 @@ public class FeedFrameCollector extends MessageReceiver<DataBucket> {
         HANDOVER
     }
 
-    public FeedFrameCollector(FrameDistributor frameDistributor, FeedPolicyAccessor feedPolicyAccessor,
-            IFrameWriter frameWriter, FeedConnectionId connectionId) {
-        super();
-        this.frameDistributor = frameDistributor;
-        this.fpa = feedPolicyAccessor;
+    public FeedFrameCollector(FeedPolicyAccessor feedPolicyAccessor, IFrameWriter writer,
+            FeedConnectionId connectionId) {
         this.connectionId = connectionId;
-        this.frameWriter = frameWriter;
+        this.writer = writer;
         this.state = State.ACTIVE;
     }
 
     @Override
-    public void processMessage(DataBucket bucket) throws Exception {
-        try {
-            ByteBuffer frame = bucket.getContent();
-            switch (bucket.getContentType()) {
-                case DATA:
-                    frameWriter.nextFrame(frame);
-                    break;
-                case EOD:
-                    closeCollector();
-                    break;
-                case EOSD:
-                    throw new AsterixException("Received data bucket with content of type " + bucket.getContentType());
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to process data bucket " + bucket + ", encountered exception " + e.getMessage());
-            }
-        } finally {
-            bucket.doneReading();
-        }
-    }
-
-    public void closeCollector() {
-        if (state.equals(State.TRANSITION)) {
-            super.close(true);
-            setState(State.ACTIVE);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(this + " is now " + State.ACTIVE + " mode, processing frames synchronously");
-            }
-        } else {
-            flushPendingMessages();
-            setState(State.FINISHED);
-            synchronized (frameDistributor.getRegisteredCollectors()) {
-                frameDistributor.getRegisteredCollectors().notifyAll();
-            }
-            disconnect();
-        }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Closed collector " + this);
-        }
+    public synchronized void close() throws HyracksDataException {
+        writer.close();
+        state = State.FINISHED;
+        notify();
     }
 
     public synchronized void disconnect() {
         setState(State.FINISHED);
     }
 
+    @Override
     public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
-        frameWriter.nextFrame(frame);
-    }
-
-    public FeedPolicyAccessor getFeedPolicyAccessor() {
-        return fpa;
+        writer.nextFrame(frame);
     }
 
     public synchronized State getState() {
@@ -123,17 +75,14 @@ public class FeedFrameCollector extends MessageReceiver<DataBucket> {
             default:
                 break;
         }
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Frame Collector " + this.frameDistributor.getFeedRuntimeType() + " switched to " + state);
-        }
     }
 
     public IFrameWriter getFrameWriter() {
-        return frameWriter;
+        return writer;
     }
 
-    public void setFrameWriter(IFrameWriter frameWriter) {
-        this.frameWriter = frameWriter;
+    public void setFrameWriter(IFrameWriter writer) {
+        this.writer = writer;
     }
 
     @Override
@@ -158,12 +107,21 @@ public class FeedFrameCollector extends MessageReceiver<DataBucket> {
     }
 
     @Override
-    public void emptyInbox() throws HyracksDataException {
-        flush();
+    public synchronized void flush() throws HyracksDataException {
+        writer.flush();
     }
 
-    public synchronized void flush() throws HyracksDataException {
-        frameWriter.flush();
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
     }
 
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java
deleted file mode 100644
index 53ee475..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameDiscarder.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class FeedFrameDiscarder {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedFrameSpiller.class.getName());
-
-    private final FeedRuntimeInputHandler inputHandler;
-    private final FeedConnectionId connectionId;
-    private final FeedRuntimeId runtimeId;
-    private final FeedPolicyAccessor policyAccessor;
-    private final float maxFractionDiscard;
-    private int nDiscarded;
-
-    public FeedFrameDiscarder(FeedConnectionId connectionId, FeedRuntimeId runtimeId, FeedPolicyAccessor policyAccessor,
-            FeedRuntimeInputHandler inputHandler) throws HyracksDataException {
-        this.connectionId = connectionId;
-        this.runtimeId = runtimeId;
-        this.policyAccessor = policyAccessor;
-        this.inputHandler = inputHandler;
-        this.maxFractionDiscard = policyAccessor.getMaxFractionDiscard();
-    }
-
-    public boolean processMessage(ByteBuffer message) {
-        if (policyAccessor.getMaxFractionDiscard() != 0) {
-            long nProcessed = inputHandler.getProcessed();
-            long discardLimit = (long) (nProcessed * maxFractionDiscard);
-            if (nDiscarded >= discardLimit) {
-                return false;
-            }
-            nDiscarded++;
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Discarded frame by " + connectionId + " (" + runtimeId + ")" + " count so far  ("
-                        + nDiscarded + ") Limit [" + discardLimit + "]");
-            }
-            return true;
-        }
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/fba622b3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
deleted file mode 100644
index 2a6fd79..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.feed.api.IFeedFrameHandler;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.message.MessageReceiver;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-
-public class FeedFrameHandlers {
-
-    private static final Logger LOGGER = Logger.getLogger(FeedFrameHandlers.class.getName());
-
-    public enum RoutingMode {
-        IN_MEMORY_ROUTE,
-        SPILL_TO_DISK,
-        DISCARD
-    }
-
-    public static IFeedFrameHandler getFeedFrameHandler(FrameDistributor distributor, FeedId feedId,
-            RoutingMode routingMode, FeedRuntimeType runtimeType, int partition, int frameSize) throws IOException {
-        IFeedFrameHandler handler = null;
-        switch (routingMode) {
-            case IN_MEMORY_ROUTE:
-                handler = new InMemoryRouter(distributor.getRegisteredReaders().values(), runtimeType, partition);
-                break;
-            case SPILL_TO_DISK:
-                handler = new DiskSpiller(distributor, feedId, runtimeType, partition, frameSize);
-                break;
-            case DISCARD:
-                handler = new DiscardRouter(distributor, feedId, runtimeType, partition);
-                break;
-            default:
-                throw new IllegalArgumentException("Invalid routing mode" + routingMode);
-        }
-        return handler;
-    }
-
-    public static class DiscardRouter implements IFeedFrameHandler {
-
-        private final FeedId feedId;
-        private int nDiscarded;
-        private final FeedRuntimeType runtimeType;
-        private final int partition;
-        private final FrameDistributor distributor;
-
-        public DiscardRouter(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition)
-                throws HyracksDataException {
-            this.distributor = distributor;
-            this.feedId = feedId;
-            this.nDiscarded = 0;
-            this.runtimeType = runtimeType;
-            this.partition = partition;
-        }
-
-        @Override
-        public void handleFrame(ByteBuffer frame) throws HyracksDataException {
-            FrameTupleAccessor fta = distributor.getFta();
-            fta.reset(frame);
-            int nTuples = fta.getTupleCount();
-            nDiscarded += nTuples;
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Discarded additional [" + runtimeType + "]" + "(" + partition + ")" + "  " + nTuples);
-            }
-        }
-
-        @Override
-        public void handleDataBucket(DataBucket bucket) {
-            nDiscarded++;
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Discard Count" + nDiscarded);
-            }
-        }
-
-        @Override
-        public void close() {
-            // do nothing, no resource to relinquish
-        }
-
-        @Override
-        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
-            throw new IllegalStateException("Invalid operation");
-        }
-
-        @Override
-        public String toString() {
-            return "DiscardRouter" + "[" + feedId + "]" + "(" + nDiscarded + ")";
-        }
-
-        @Override
-        public String getSummary() {
-            return new String("Number of discarded frames (since last reset)" + " feedId " + "[" + feedId + "]" + "("
-                    + nDiscarded + ")");
-        }
-
-    }
-
-    public static class InMemoryRouter implements IFeedFrameHandler {
-
-        private final Collection<FeedFrameCollector> frameCollectors;
-
-        public InMemoryRouter(Collection<FeedFrameCollector> frameCollectors, FeedRuntimeType runtimeType,
-                int partition) {
-            this.frameCollectors = frameCollectors;
-        }
-
-        @Override
-        public void handleFrame(ByteBuffer frame) throws HyracksDataException {
-            throw new IllegalStateException("Operation not supported");
-        }
-
-        @Override
-        public void handleDataBucket(DataBucket bucket) throws InterruptedException {
-            for (FeedFrameCollector collector : frameCollectors) {
-                collector.sendMessage(bucket);
-            }
-        }
-
-        @Override
-        public void close() {
-            // do nothing
-        }
-
-        @Override
-        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
-            throw new IllegalStateException("Operation not supported");
-        }
-
-        @Override
-        public String getSummary() {
-            return "InMemoryRouter Summary";
-        }
-    }
-
-    public static class DiskSpiller implements IFeedFrameHandler {
-
-        private FrameSpiller<ByteBuffer> receiver;
-        private Iterator<ByteBuffer> iterator;
-
-        public DiskSpiller(FrameDistributor distributor, FeedId feedId, FeedRuntimeType runtimeType, int partition,
-                int frameSize) throws IOException {
-            receiver = new FrameSpiller<ByteBuffer>(distributor, feedId, frameSize);
-        }
-
-        @Override
-        public void handleFrame(ByteBuffer frame) throws HyracksDataException, InterruptedException {
-            receiver.sendMessage(frame);
-        }
-
-        private static class FrameSpiller<T> extends MessageReceiver<ByteBuffer> {
-
-            private final FeedId feedId;
-            private BufferedOutputStream bos;
-            private final ByteBuffer reusableLengthBuffer;
-            private final ByteBuffer reusableDataBuffer;
-            private long offset;
-            private File file;
-            private final FrameDistributor frameDistributor;
-            private boolean fileCreated = false;
-
-            public FrameSpiller(FrameDistributor distributor, FeedId feedId, int frameSize) throws IOException {
-                this.feedId = feedId;
-                this.frameDistributor = distributor;
-                reusableLengthBuffer = ByteBuffer.allocate(4);
-                reusableDataBuffer = ByteBuffer.allocate(frameSize);
-                this.offset = 0;
-            }
-
-            @Override
-            public void processMessage(ByteBuffer message) throws Exception {
-                if (!fileCreated) {
-                    createFile();
-                    fileCreated = true;
-                }
-                reusableLengthBuffer.flip();
-                reusableLengthBuffer.putInt(message.array().length);
-                bos.write(reusableLengthBuffer.array());
-                bos.write(message.array());
-            }
-
-            private void createFile() throws IOException {
-                Date date = new Date();
-                String dateSuffix = date.toString().replace(' ', '_');
-                String fileName = feedId.toString() + "_" + frameDistributor.getFeedRuntimeType() + "_"
-                        + frameDistributor.getPartition() + "_" + dateSuffix;
-
-                file = new File(fileName);
-                if (!file.exists()) {
-                    boolean success = file.createNewFile();
-                    if (!success) {
-                        throw new IOException("Unable to create spill file for feed " + feedId);
-                    }
-                }
-                bos = new BufferedOutputStream(new FileOutputStream(file));
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Created Spill File for feed " + feedId);
-                }
-            }
-
-            @SuppressWarnings("resource")
-            public Iterator<ByteBuffer> replayData() throws Exception {
-                final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
-                bis.skip(offset);
-                return new Iterator<ByteBuffer>() {
-
-                    @Override
-                    public boolean hasNext() {
-                        boolean more = false;
-                        try {
-                            more = bis.available() > 0;
-                            if (!more) {
-                                bis.close();
-                            }
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        }
-
-                        return more;
-                    }
-
-                    @Override
-                    public ByteBuffer next() {
-                        reusableLengthBuffer.flip();
-                        try {
-                            bis.read(reusableLengthBuffer.array());
-                            reusableLengthBuffer.flip();
-                            int frameSize = reusableLengthBuffer.getInt();
-                            reusableDataBuffer.flip();
-                            bis.read(reusableDataBuffer.array(), 0, frameSize);
-                            offset += 4 + frameSize;
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        }
-                        return reusableDataBuffer;
-                    }
-
-                    @Override
-                    public void remove() {
-                    }
-
-                };
-            }
-
-            @Override
-            public void emptyInbox() throws HyracksDataException {
-            }
-
-        }
-
-        @Override
-        public void handleDataBucket(DataBucket bucket) {
-            throw new IllegalStateException("Operation not supported");
-        }
-
-        @Override
-        public void close() {
-            receiver.close(true);
-        }
-
-        @Override
-        public Iterator<ByteBuffer> replayData() throws HyracksDataException {
-            try {
-                iterator = receiver.replayData();
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
-            return iterator;
-        }
-
-        //TODO: Form a summary that includes stats related to what has been spilled to disk
-        @Override
-        public String getSummary() {
-            return "Disk Spiller Summary";
-        }
-
-    }
-
-}