You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:32:05 UTC

[11/26] incubator-asterixdb git commit: Feed Fixes and Cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FeedPolicyEnforcer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FeedPolicyEnforcer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FeedPolicyEnforcer.java
deleted file mode 100644
index ae5c050..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FeedPolicyEnforcer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feeds;
-
-import java.rmi.RemoteException;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-
-public class FeedPolicyEnforcer {
-
-    private final FeedConnectionId connectionId;
-    private final FeedPolicyAccessor policyAccessor;
-
-    public FeedPolicyEnforcer(FeedConnectionId feedConnectionId, Map<String, String> feedPolicy) {
-        this.connectionId = feedConnectionId;
-        this.policyAccessor = new FeedPolicyAccessor(feedPolicy);
-    }
-
-    public boolean continueIngestionPostSoftwareFailure(Exception e) throws RemoteException, ACIDException {
-        return policyAccessor.continueOnSoftFailure();
-    }
-
-    public FeedPolicyAccessor getFeedPolicyAccessor() {
-        return policyAccessor;
-    }
-
-    public FeedConnectionId getFeedId() {
-        return connectionId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
index 3b59b98..93ba0a0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
@@ -37,6 +37,7 @@ public abstract class AbstractStreamRecordReader implements IRecordReader<char[]
     protected int bufferLength = 0;
     protected int bufferPosn = 0;
     protected IExternalIndexer indexer;
+    protected boolean done = false;
 
     @Override
     public IRawRecord<char[]> next() throws IOException {
@@ -45,7 +46,10 @@ public abstract class AbstractStreamRecordReader implements IRecordReader<char[]
 
     @Override
     public void close() throws IOException {
-        reader.close();
+        if (!done) {
+            reader.close();
+        }
+        done = true;
     }
 
     public void setInputStream(AInputStream inputStream) throws IOException {
@@ -72,4 +76,15 @@ public abstract class AbstractStreamRecordReader implements IRecordReader<char[]
     public void setIndexer(IExternalIndexer indexer) {
         this.indexer = indexer;
     }
+
+    @Override
+    public boolean stop() {
+        try {
+            reader.stop();
+            return true;
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
index 9b11df6..2b33d7a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
@@ -32,6 +32,9 @@ public class LineRecordReader extends AbstractStreamRecordReader {
 
     @Override
     public boolean hasNext() throws IOException {
+        if (done) {
+            return false;
+        }
         /* We're reading data from in, but the head of the stream may be
          * already buffered in buffer, so we have several cases:
          * 1. No newline characters are in the buffer, so we need to copy
@@ -63,7 +66,7 @@ public class LineRecordReader extends AbstractStreamRecordReader {
                         recordNumber++;
                         return true;
                     }
-                    reader.close();
+                    close();
                     return false; //EOF
                 }
             }
@@ -92,11 +95,6 @@ public class LineRecordReader extends AbstractStreamRecordReader {
     }
 
     @Override
-    public boolean stop() {
-        return false;
-    }
-
-    @Override
     public void configure(Map<String, String> configuration) throws Exception {
         super.configure(configuration);
         if (ExternalDataUtils.hasHeader(configuration)) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
index 668876e..49e67e9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
@@ -44,6 +44,9 @@ public class QuotedLineRecordReader extends LineRecordReader {
 
     @Override
     public boolean hasNext() throws IOException {
+        if (done) {
+            return false;
+        }
         newlineLength = 0;
         prevCharCR = false;
         prevCharEscape = false;
@@ -65,6 +68,7 @@ public class QuotedLineRecordReader extends LineRecordReader {
                             recordNumber++;
                             return true;
                         }
+                        close();
                         return false;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
index 9864805..84c96d0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
@@ -67,6 +67,9 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
 
     @Override
     public boolean hasNext() throws Exception {
+        if (done) {
+            return false;
+        }
         record.reset();
         boolean hasStarted = false;
         boolean hasFinished = false;
@@ -79,6 +82,7 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
                 startPosn = bufferPosn = 0;
                 bufferLength = reader.read(inputBuffer);
                 if (bufferLength <= 0) {
+                    close();
                     return false; // EOF
                 }
             }
@@ -142,6 +146,12 @@ public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
 
     @Override
     public boolean stop() {
-        return false;
+        try {
+            reader.stop();
+        } catch (Exception e) {
+            e.printStackTrace();
+            return false;
+        }
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
index e7c141d..3ce6a81 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
@@ -38,11 +38,16 @@ public class TwitterPushRecordReader implements IRecordReader<Status> {
     private LinkedBlockingQueue<Status> inputQ;
     private TwitterStream twitterStream;
     private GenericRecord<Status> record;
+    private boolean closed = false;
 
     @Override
     public void close() throws IOException {
-        twitterStream.clearListeners();
-        twitterStream.cleanUp();
+        if (!closed) {
+            twitterStream.clearListeners();
+            twitterStream.cleanUp();
+            twitterStream = null;
+            closed = true;
+        }
     }
 
     @Override
@@ -61,7 +66,7 @@ public class TwitterPushRecordReader implements IRecordReader<Status> {
 
     @Override
     public boolean hasNext() throws Exception {
-        return true;
+        return !closed;
     }
 
     @Override
@@ -81,7 +86,12 @@ public class TwitterPushRecordReader implements IRecordReader<Status> {
 
     @Override
     public boolean stop() {
-        return false;
+        try {
+            close();
+        } catch (Exception e) {
+            return false;
+        }
+        return true;
     }
 
     private class TweetListener implements StatusListener {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
index 72aaa37..6840c11 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
@@ -97,7 +97,7 @@ public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status>
             pull = false;
         } else {
             throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
-                    + ExternalDataConstants.KEY_PUSH + "must be specified as part of adaptor configuration");
+                    + ExternalDataConstants.KEY_PUSH + " must be specified as part of adaptor configuration");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index e573f74..7ba6032 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.input.stream;
 
+import java.io.IOException;
 import java.io.InputStreamReader;
 
 public class AInputStreamReader extends InputStreamReader {
@@ -31,4 +32,12 @@ public class AInputStreamReader extends InputStreamReader {
     public boolean skipError() throws Exception {
         return in.skipError();
     }
+
+    public void stop() throws IOException {
+        try {
+            in.stop();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
index b3ad1c3..8f4c094 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
@@ -63,7 +63,7 @@ public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implem
                 }
             } else if (value.getLength() == pos) {
                 pos++;
-                return ExternalDataConstants.EOL;
+                return ExternalDataConstants.BYTE_LF;
             }
             return value.getBytes()[pos++];
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
index b511617..22d0a87 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
@@ -19,38 +19,44 @@
 package org.apache.asterix.external.input.stream;
 
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
+import java.nio.file.Path;
 import java.util.Map;
 
 import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FeedUtils;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 
 public class LocalFSInputStreamProvider implements IInputStreamProvider {
 
-    private FileSplit[] fileSplits;
-    private int partition;
+    private String expression;
+    private boolean isFeed;
+    private Path path;
+    private File feedLogFile;
 
     public LocalFSInputStreamProvider(FileSplit[] fileSplits, IHyracksTaskContext ctx,
-            Map<String, String> configuration, int partition) {
-        this.partition = partition;
-        this.fileSplits = fileSplits;
+            Map<String, String> configuration, int partition, String expression, boolean isFeed,
+            FileSplit[] feedLogFileSplits) {
+        this.expression = expression;
+        this.isFeed = isFeed;
+        this.path = fileSplits[partition].getLocalFile().getFile().toPath();
+        if (feedLogFileSplits != null) {
+            this.feedLogFile = FeedUtils
+                    .getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(),
+                            feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager())
+                    .getFile();
+
+        }
     }
 
     @Override
-    public AInputStream getInputStream() throws Exception {
-        FileSplit split = fileSplits[partition];
-        File inputFile = split.getLocalFile().getFile();
-        InputStream in;
-        try {
-            in = new FileInputStream(inputFile);
-            return new BasicInputStream(in);
-        } catch (FileNotFoundException e) {
-            throw new IOException(e);
+    public AInputStream getInputStream() throws IOException {
+        FeedLogManager feedLogManager = null;
+        if (isFeed && feedLogFile != null) {
+            feedLogManager = new FeedLogManager(feedLogFile);
         }
+        return new LocalFileSystemInputStream(path, expression, feedLogManager, isFeed);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
new file mode 100644
index 0000000..7eebe4c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.FileSystemWatcher;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class LocalFileSystemInputStream extends AInputStream {
+    private final FileSystemWatcher watcher;
+    private FileInputStream in;
+    private byte lastByte;
+
+    public LocalFileSystemInputStream(Path inputResource, String expression, FeedLogManager logManager, boolean isFeed)
+            throws IOException {
+        this.watcher = new FileSystemWatcher(logManager, inputResource, expression, isFeed);
+        this.watcher.init();
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException ioe = null;
+        if (in != null) {
+            try {
+                closeFile();
+            } catch (Exception e) {
+                ioe = new IOException(e);
+            }
+        }
+        try {
+            watcher.close();
+        } catch (Exception e) {
+            if (ioe == null) {
+                throw e;
+            }
+            ioe.addSuppressed(e);
+            throw ioe;
+        }
+    }
+
+    private void closeFile() throws IOException {
+        if (in != null) {
+            try {
+                in.close();
+            } finally {
+                in = null;
+            }
+        }
+    }
+
+    /**
+     * Closes the current input stream and opens the next one, if any.
+     */
+    private boolean advance() throws IOException {
+        closeFile();
+        if (watcher.hasNext()) {
+            in = new FileInputStream(watcher.next());
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+        throw new HyracksDataException(
+                "read() is not supported with this stream. use read(byte[] b, int off, int len)");
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (in == null) {
+            if (!advance()) {
+                return -1;
+            }
+        }
+        int result = in.read(b, off, len);
+        while (result < 0 && advance()) {
+            // return a new line at the end of every file <--Might create problems for some cases depending on the parser implementation-->
+            if (lastByte != ExternalDataConstants.BYTE_LF && lastByte != ExternalDataConstants.BYTE_LF) {
+                lastByte = ExternalDataConstants.BYTE_LF;
+                b[off] = ExternalDataConstants.BYTE_LF;
+                return 1;
+            }
+            // recursive call
+            result = in.read(b, off, len);
+        }
+        if (result > 0) {
+            lastByte = b[off + result - 1];
+        }
+        return result;
+    }
+
+    @Override
+    public boolean skipError() throws Exception {
+        advance();
+        return true;
+    }
+
+    @Override
+    public boolean stop() throws Exception {
+        watcher.close();
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
index d32a94f..7c64aa3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
@@ -30,7 +30,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.runtime.TweetGenerator;
+import org.apache.asterix.external.util.TweetGenerator;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index 14c712a..ab1f8a0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -29,8 +29,10 @@ import org.apache.asterix.external.api.IInputStreamProviderFactory;
 import org.apache.asterix.external.api.INodeResolver;
 import org.apache.asterix.external.api.INodeResolverFactory;
 import org.apache.asterix.external.input.stream.LocalFSInputStreamProvider;
-import org.apache.asterix.external.util.DNSResolverFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.NodeResolverFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -41,15 +43,21 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
 
     private static final long serialVersionUID = 1L;
 
-    protected static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver();
+    protected static final INodeResolver DEFAULT_NODE_RESOLVER = new NodeResolverFactory().createNodeResolver();
     protected static final Logger LOGGER = Logger.getLogger(LocalFSInputStreamProviderFactory.class.getName());
     protected static INodeResolver nodeResolver;
     protected Map<String, String> configuration;
-    protected FileSplit[] fileSplits;
+    protected FileSplit[] inputFileSplits;
+    protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
+    protected boolean isFeed;
+    protected String expression;
+    // transient fields (They don't need to be serialized and transferred)
+    private transient AlgebricksAbsolutePartitionConstraint constraints;
 
     @Override
     public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception {
-        return new LocalFSInputStreamProvider(fileSplits, ctx, configuration, partition);
+        return new LocalFSInputStreamProvider(inputFileSplits, ctx, configuration, partition, expression, isFeed,
+                feedLogFileSplits);
     }
 
     @Override
@@ -67,16 +75,23 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
         this.configuration = configuration;
         String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
         configureFileSplits(splits);
+        configurePartitionConstraint();
+        this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.keepDataSourceOpen(configuration);
+        if (isFeed) {
+            feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
+                    ExternalDataUtils.getFeedName(configuration), constraints);
+        }
+        this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION);
     }
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return configurePartitionConstraint();
+        return constraints;
     }
 
     private void configureFileSplits(String[] splits) throws AsterixException {
-        if (fileSplits == null) {
-            fileSplits = new FileSplit[splits.length];
+        if (inputFileSplits == null) {
+            inputFileSplits = new FileSplit[splits.length];
             String nodeName;
             String nodeLocalPath;
             int count = 0;
@@ -90,19 +105,19 @@ public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFa
                 nodeName = trimmedValue.split(":")[0];
                 nodeLocalPath = trimmedValue.split("://")[1];
                 FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
-                fileSplits[count++] = fileSplit;
+                inputFileSplits[count++] = fileSplit;
             }
         }
     }
 
-    private AlgebricksPartitionConstraint configurePartitionConstraint() throws AsterixException {
-        String[] locs = new String[fileSplits.length];
+    private void configurePartitionConstraint() throws AsterixException {
+        String[] locs = new String[inputFileSplits.length];
         String location;
-        for (int i = 0; i < fileSplits.length; i++) {
-            location = getNodeResolver().resolveNode(fileSplits[i].getNodeName());
+        for (int i = 0; i < inputFileSplits.length; i++) {
+            location = getNodeResolver().resolveNode(inputFileSplits[i].getNodeName());
             locs[i] = location;
         }
-        return new AlgebricksAbsolutePartitionConstraint(locs);
+        constraints = new AlgebricksAbsolutePartitionConstraint(locs);
     }
 
     protected INodeResolver getNodeResolver() {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
index e9c15cb..fd3d9e3 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
@@ -75,7 +75,7 @@ public abstract class ExternalFunction implements IExternalFunction {
         }
     }
 
-    public static ISerializerDeserializer getSerDe(Object typeInfo) {
+    public static ISerializerDeserializer<?> getSerDe(Object typeInfo) {
         return AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(typeInfo);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java
index 192bd4e..b6795f6 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ResultCollector.java
@@ -117,6 +117,7 @@ public class ResultCollector implements IResultCollector {
         return reusableResultObjectHolder;
     }
 
+    @SuppressWarnings("unchecked")
     private void serializeResult(IAObject object) throws AsterixException {
         try {
             AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(finfo.getReturnType())

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java
index 93b4bf1..e7c1ec1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectUtil.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
 import org.apache.asterix.external.api.IJObject;
@@ -65,7 +64,6 @@ public class JObjectUtil {
     /**
      * Normalize an input string by removing linebreaks, and replace them with space
      * Also remove non-readable special characters
-     *
      * @param originalString
      *            The input String
      * @return
@@ -314,54 +312,38 @@ public class JObjectUtil {
                 int numberOfSchemaFields = recordType.getFieldTypes().length;
                 byte[] recordBits = dis.getInputStream().getArray();
                 boolean isExpanded = false;
-                int s = dis.getInputStream().getPosition();
-                int recordOffset = s;
-                int openPartOffset = 0;
-                int offsetArrayOffset = 0;
+                dis.getInputStream();
                 int[] fieldOffsets = new int[numberOfSchemaFields];
                 IJObject[] closedFields = new IJObject[numberOfSchemaFields];
 
-                if (recordType == null) {
-                    openPartOffset = s + AInt32SerializerDeserializer.getInt(recordBits, s + 6);
-                    s += 8;
-                    isExpanded = true;
-                } else {
-                    dis.skip(4); // reading length is not required.
-                    if (recordType.isOpen()) {
-                        isExpanded = dis.readBoolean();
-                        if (isExpanded) {
-                            openPartOffset = s + dis.readInt(); // AInt32SerializerDeserializer.getInt(recordBits, s + 6);
-                        } else {
-                            // do nothing s += 6;
-                        }
+                dis.skip(4); // reading length is not required.
+                if (recordType.isOpen()) {
+                    isExpanded = dis.readBoolean();
+                    if (isExpanded) {
+                        dis.readInt();
                     } else {
-                        // do nothing s += 5;
                     }
+                } else {
                 }
 
                 if (numberOfSchemaFields > 0) {
-                    int numOfSchemaFields = dis.readInt(); //s += 4;
+                    dis.readInt();
                     int nullBitMapOffset = 0;
                     boolean hasNullableFields = NonTaggedFormatUtil.hasNullableField(recordType);
                     if (hasNullableFields) {
-                        nullBitMapOffset = dis.getInputStream().getPosition();//s
-                        offsetArrayOffset = dis.getInputStream().getPosition() //s
-                                + (numberOfSchemaFields % 8 == 0 ? numberOfSchemaFields / 8
-                                        : numberOfSchemaFields / 8 + 1);
+                        nullBitMapOffset = dis.getInputStream().getPosition();
+                        dis.getInputStream();
                     } else {
-                        offsetArrayOffset = dis.getInputStream().getPosition();
+                        dis.getInputStream();
                     }
                     for (int i = 0; i < numberOfSchemaFields; i++) {
-                        fieldOffsets[i] = dis.readInt(); // AInt32SerializerDeserializer.getInt(recordBits, offsetArrayOffset) + recordOffset;
-                        // offsetArrayOffset += 4;
+                        fieldOffsets[i] = dis.readInt();
                     }
                     for (int fieldNumber = 0; fieldNumber < numberOfSchemaFields; fieldNumber++) {
                         if (hasNullableFields) {
                             byte b1 = recordBits[nullBitMapOffset + fieldNumber / 8];
                             int p = 1 << (7 - (fieldNumber % 8));
                             if ((b1 & p) == 0) {
-                                // set null value (including type tag inside)
-                                //fieldValues.add(nullReference);
                                 continue;
                             }
                         }
@@ -373,8 +355,6 @@ public class JObjectUtil {
                             if (((AUnionType) fieldTypes[fieldNumber]).isNullableType()) {
                                 fieldType = ((AUnionType) fieldTypes[fieldNumber]).getNullableType();
                                 fieldValueTypeTag = fieldType.getTypeTag();
-                                //                      fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(recordBits,
-                                //                              fieldOffsets[fieldNumber], typeTag, false);
                             }
                         } else {
                             fieldValueTypeTag = fieldTypes[fieldNumber].getTypeTag();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDataScanOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDataScanOperatorDescriptor.java
new file mode 100644
index 0000000..aed8bb9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDataScanOperatorDescriptor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/*
+ * A single activity operator that provides the functionality of scanning data using an
+ * instance of the configured adapter.
+ */
+public class ExternalDataScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private IAdapterFactory adapterFactory;
+
+    public ExternalDataScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
+            IAdapterFactory dataSourceAdapterFactory) {
+        super(spec, 0, 1);
+        recordDescriptors[0] = rDesc;
+        this.adapterFactory = dataSourceAdapterFactory;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+                    throws HyracksDataException {
+
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+            @Override
+            public void initialize() throws HyracksDataException {
+                IDataSourceAdapter adapter = null;
+                try {
+                    writer.open();
+                    adapter = adapterFactory.createAdapter(ctx, partition);
+                    adapter.start(partition, writer);
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw new HyracksDataException(th);
+                } finally {
+                    writer.close();
+                }
+            }
+        };
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
index 59ad076..82ca715 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.operators;
 
-import java.io.File;
 import java.util.List;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
new file mode 100644
index 0000000..a929eec
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * FeedCollectOperatorDescriptor is responsible for ingesting data from an external source. This
+ * operator uses a user specified for a built-in adaptor for retrieving data from the external
+ * data source.
+ */
+public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(FeedCollectOperatorDescriptor.class.getName());
+
+    /** The type associated with the ADM data output from the feed adaptor */
+    private final IAType outputType;
+
+    /** unique identifier for a feed instance. */
+    private final FeedConnectionId connectionId;
+
+    /** Map representation of policy parameters */
+    private final Map<String, String> feedPolicyProperties;
+
+    /** The (singleton) instance of {@code IFeedIngestionManager} **/
+    private IFeedSubscriptionManager subscriptionManager;
+
+    /** The source feed from which the feed derives its data from. **/
+    private final FeedId sourceFeedId;
+
+    /** The subscription location at which the recipient feed receives tuples from the source feed **/
+    private final ConnectionLocation subscriptionLocation;
+
+    public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, FeedId sourceFeedId,
+            ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
+            ConnectionLocation subscriptionLocation) {
+        super(spec, 0, 1);
+        recordDescriptors[0] = rDesc;
+        this.outputType = atype;
+        this.connectionId = feedConnectionId;
+        this.feedPolicyProperties = feedPolicyProperties;
+        this.sourceFeedId = sourceFeedId;
+        this.subscriptionLocation = subscriptionLocation;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+                    throws HyracksDataException {
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
+        this.subscriptionManager = ((IFeedManager) runtimeCtx.getFeedManager()).getFeedSubscriptionManager();
+        ISubscribableRuntime sourceRuntime = null;
+        IOperatorNodePushable nodePushable = null;
+        switch (subscriptionLocation) {
+            case SOURCE_FEED_INTAKE_STAGE:
+                try {
+                    SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+                            FeedRuntimeType.INTAKE, partition);
+                    sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
+                    if (sourceRuntime == null) {
+                        throw new HyracksDataException(
+                                "Source intake task not found for source feed id " + sourceFeedId);
+                    }
+                    nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
+                            feedPolicyProperties, partition, nPartitions, sourceRuntime);
+
+                } catch (Exception exception) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.severe("Initialization of the feed adaptor failed with exception " + exception);
+                    }
+                    throw new HyracksDataException("Initialization of the feed adapter failed", exception);
+                }
+                break;
+            case SOURCE_FEED_COMPUTE_STAGE:
+                SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+                        FeedRuntimeType.COMPUTE, partition);
+                sourceRuntime = subscriptionManager.getSubscribableRuntime(feedSubscribableRuntimeId);
+                if (sourceRuntime == null) {
+                    throw new HyracksDataException("Source compute task not found for source feed id " + sourceFeedId
+                            + " " + FeedRuntimeType.COMPUTE + "[" + partition + "]");
+                }
+                nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
+                        feedPolicyProperties, partition, nPartitions, sourceRuntime);
+                break;
+        }
+        return nodePushable;
+    }
+
+    public FeedConnectionId getFeedConnectionId() {
+        return connectionId;
+    }
+
+    public Map<String, String> getFeedPolicyProperties() {
+        return feedPolicyProperties;
+    }
+
+    public IAType getOutputType() {
+        return outputType;
+    }
+
+    public RecordDescriptor getRecordDescriptor() {
+        return recordDescriptors[0];
+    }
+
+    public FeedId getSourceFeedId() {
+        return sourceFeedId;
+    }
+
+    private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
+        int waitCycleCount = 0;
+        ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
+        while (ingestionRuntime == null && waitCycleCount < 10) {
+            try {
+                Thread.sleep(2000);
+                waitCycleCount++;
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId);
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                break;
+            }
+            ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
+        }
+        return (IngestionRuntime) ingestionRuntime;
+    }
+
+    public ConnectionLocation getSubscriptionLocation() {
+        return subscriptionLocation;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
new file mode 100644
index 0000000..8916af6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedOperatorOutputSideHandler;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
+import org.apache.asterix.external.feed.dataflow.CollectTransformFeedFrameWriter;
+import org.apache.asterix.external.feed.dataflow.FeedCollectRuntimeInputHandler;
+import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.CollectionRuntime;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ * The runtime for @see{FeedIntakeOperationDescriptor}
+ */
+public class FeedCollectOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+
+    private static Logger LOGGER = Logger.getLogger(FeedCollectOperatorNodePushable.class.getName());
+
+    private final int partition;
+    private final FeedConnectionId connectionId;
+    private final Map<String, String> feedPolicy;
+    private final FeedPolicyAccessor policyAccessor;
+    private final IFeedManager feedManager;
+    private final ISubscribableRuntime sourceRuntime;
+    private final IHyracksTaskContext ctx;
+    private final int nPartitions;
+
+    private RecordDescriptor outputRecordDescriptor;
+    private FeedRuntimeInputHandler inputSideHandler;
+    private CollectionRuntime collectRuntime;
+
+    public FeedCollectOperatorNodePushable(IHyracksTaskContext ctx, FeedId sourceFeedId,
+            FeedConnectionId feedConnectionId, Map<String, String> feedPolicy, int partition, int nPartitions,
+            ISubscribableRuntime sourceRuntime) {
+        this.ctx = ctx;
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+        this.connectionId = feedConnectionId;
+        this.sourceRuntime = sourceRuntime;
+        this.feedPolicy = feedPolicy;
+        policyAccessor = new FeedPolicyAccessor(feedPolicy);
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
+        this.feedManager = (IFeedManager) runtimeCtx.getFeedManager();
+    }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+        try {
+            outputRecordDescriptor = recordDesc;
+            FeedRuntimeType sourceRuntimeType = ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId())
+                    .getFeedRuntimeType();
+            switch (sourceRuntimeType) {
+                case INTAKE:
+                    handleCompleteConnection();
+                    break;
+                case COMPUTE:
+                    handlePartialConnection();
+                    break;
+                default:
+                    throw new IllegalStateException("Invalid source type " + sourceRuntimeType);
+            }
+
+            State state = collectRuntime.waitTillCollectionOver();
+            if (state.equals(State.FINISHED)) {
+                feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
+                        collectRuntime.getRuntimeId());
+                writer.close();
+                inputSideHandler.close();
+            } else if (state.equals(State.HANDOVER)) {
+                inputSideHandler.setMode(Mode.STALL);
+                writer.close();
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Ending Collect Operator, the input side handler is now in " + Mode.STALL
+                            + " and the output writer " + writer + " has been closed ");
+                }
+            }
+        } catch (InterruptedException ie) {
+            handleInterruptedException(ie);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    private void handleCompleteConnection() throws Exception {
+        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COLLECT, partition,
+                FeedRuntimeId.DEFAULT_OPERAND_ID);
+        collectRuntime = (CollectionRuntime) feedManager.getFeedConnectionManager().getFeedRuntime(connectionId,
+                runtimeId);
+        if (collectRuntime == null) {
+            beginNewFeed(runtimeId);
+        } else {
+            reviveOldFeed();
+        }
+    }
+
+    private void beginNewFeed(FeedRuntimeId runtimeId) throws Exception {
+        writer.open();
+        IFrameWriter outputSideWriter = writer;
+        if (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType()
+                .equals(FeedRuntimeType.COMPUTE)) {
+            outputSideWriter = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime, outputRecordDescriptor,
+                    connectionId);
+            this.recordDesc = sourceRuntime.getRecordDescriptor();
+        }
+
+        FrameTupleAccessor tupleAccessor = new FrameTupleAccessor(recordDesc);
+        inputSideHandler = new FeedCollectRuntimeInputHandler(ctx, connectionId, runtimeId, outputSideWriter,
+                policyAccessor, false, tupleAccessor, recordDesc, feedManager, nPartitions);
+
+        collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, outputSideWriter,
+                sourceRuntime, feedPolicy);
+        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+        sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
+    }
+
+    private void reviveOldFeed() throws HyracksDataException {
+        writer.open();
+        collectRuntime.getFrameCollector().setState(State.ACTIVE);
+        inputSideHandler = collectRuntime.getInputHandler();
+
+        IFrameWriter innerWriter = inputSideHandler.getCoreOperator();
+        if (innerWriter instanceof CollectTransformFeedFrameWriter) {
+            ((CollectTransformFeedFrameWriter) innerWriter).reset(this.writer);
+        } else {
+            inputSideHandler.setCoreOperator(writer);
+        }
+
+        inputSideHandler.setMode(Mode.PROCESS);
+    }
+
+    private void handlePartialConnection() throws Exception {
+        FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.COMPUTE_COLLECT, partition,
+                FeedRuntimeId.DEFAULT_OPERAND_ID);
+        writer.open();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Beginning new feed (from existing partial connection):" + connectionId);
+        }
+        IFeedOperatorOutputSideHandler wrapper = new CollectTransformFeedFrameWriter(ctx, writer, sourceRuntime,
+                outputRecordDescriptor, connectionId);
+
+        inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, wrapper, policyAccessor, false,
+                new FrameTupleAccessor(recordDesc), recordDesc, feedManager, nPartitions);
+
+        collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, wrapper, sourceRuntime,
+                feedPolicy);
+        feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
+        recordDesc = sourceRuntime.getRecordDescriptor();
+        sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
+    }
+
+    private void handleInterruptedException(InterruptedException ie) throws HyracksDataException {
+        if (policyAccessor.continueOnHardwareFailure()) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Continuing on failure as per feed policy, switching to " + Mode.STALL
+                        + " until failure is resolved");
+            }
+            inputSideHandler.setMode(Mode.STALL);
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Failure during feed ingestion. Deregistering feed runtime " + collectRuntime
+                        + " as feed is not configured to handle failures");
+            }
+            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, collectRuntime.getRuntimeId());
+            writer.close();
+            throw new HyracksDataException(ie);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
new file mode 100644
index 0000000..a18ebcd
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.feed.api.IFeed;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * An operator responsible for establishing connection with external data source and parsing,
+ * translating the received content.It uses an instance of feed adaptor to perform these functions.
+ */
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
+
+    /** The unique identifier of the feed that is being ingested. **/
+    private final FeedId feedId;
+
+    private final FeedPolicyAccessor policyAccessor;
+
+    /** The adaptor factory that is used to create an instance of the feed adaptor **/
+    private IAdapterFactory adaptorFactory;
+
+    /** The library that contains the adapter in use. **/
+    private String adaptorLibraryName;
+
+    /**
+     * The adapter factory class that is used to create an instance of the feed adapter.
+     * This value is used only in the case of external adapters.
+     **/
+    private String adaptorFactoryClassName;
+
+    /** The configuration parameters associated with the adapter. **/
+    private Map<String, String> adaptorConfiguration;
+
+    private ARecordType adapterOutputType;
+
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, IAdapterFactory adapterFactory,
+            ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
+        super(spec, 0, 1);
+        this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+        this.adaptorFactory = adapterFactory;
+        this.adapterOutputType = adapterOutputType;
+        this.policyAccessor = policyAccessor;
+    }
+
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, String adapterLibraryName,
+            String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor) {
+        super(spec, 0, 1);
+        this.feedId = new FeedId(primaryFeed.getDataverseName(), primaryFeed.getFeedName());
+        this.adaptorFactoryClassName = adapterFactoryClassName;
+        this.adaptorLibraryName = adapterLibraryName;
+        this.adaptorConfiguration = primaryFeed.getAdapterConfiguration();
+        this.adapterOutputType = adapterOutputType;
+        this.policyAccessor = policyAccessor;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
+        IFeedSubscriptionManager feedSubscriptionManager = ((IFeedManager) runtimeCtx.getFeedManager())
+                .getFeedSubscriptionManager();
+        SubscribableFeedRuntimeId feedIngestionId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
+                partition);
+        IngestionRuntime ingestionRuntime = (IngestionRuntime) feedSubscriptionManager
+                .getSubscribableRuntime(feedIngestionId);
+        if (adaptorFactory == null) {
+            try {
+                adaptorFactory = createExtenralAdapterFactory(ctx, partition);
+            } catch (Exception exception) {
+                throw new HyracksDataException(exception);
+            }
+
+        }
+        return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, ingestionRuntime,
+                policyAccessor);
+    }
+
+    private IAdapterFactory createExtenralAdapterFactory(IHyracksTaskContext ctx, int partition) throws Exception {
+        IAdapterFactory adapterFactory = null;
+        ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
+                adaptorLibraryName);
+        if (classLoader != null) {
+            adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance()));
+            adapterFactory.configure(adaptorConfiguration, adapterOutputType);
+        } else {
+            String message = "Unable to create adapter as class loader not configured for library " + adaptorLibraryName
+                    + " in dataverse " + feedId.getDataverse();
+            LOGGER.severe(message);
+            throw new IllegalArgumentException(message);
+        }
+        return adapterFactory;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
new file mode 100644
index 0000000..b31f2bf
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IAdapterRuntimeManager;
+import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
+import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
+import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
+import org.apache.asterix.external.feed.api.ISubscriberRuntime;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
+import org.apache.asterix.external.feed.runtime.CollectionRuntime;
+import org.apache.asterix.external.feed.runtime.IngestionRuntime;
+import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
+import org.apache.asterix.external.api.IFeedAdapter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+/**
+ * The runtime for @see{FeedIntakeOperationDescriptor}.
+ * Provides the core functionality to set up the artifacts for ingestion of a feed.
+ * The artifacts are lazily activated when a feed receives a subscription request.
+ */
+public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+
+    private static Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
+
+    private final FeedId feedId;
+    private final int partition;
+    private final IFeedSubscriptionManager feedSubscriptionManager;
+    private final IFeedManager feedManager;
+    private final IHyracksTaskContext ctx;
+    private final IAdapterFactory adapterFactory;
+
+    private IngestionRuntime ingestionRuntime;
+    private IFeedAdapter adapter;
+    private IIntakeProgressTracker tracker;
+    private DistributeFeedFrameWriter feedFrameWriter;
+
+    public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IAdapterFactory adapterFactory,
+            int partition, IngestionRuntime ingestionRuntime, FeedPolicyAccessor policyAccessor) {
+        this.ctx = ctx;
+        this.feedId = feedId;
+        this.partition = partition;
+        this.ingestionRuntime = ingestionRuntime;
+        this.adapterFactory = adapterFactory;
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
+        this.feedManager = (IFeedManager) runtimeCtx.getFeedManager();
+        this.feedSubscriptionManager = feedManager.getFeedSubscriptionManager();
+    }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+        IAdapterRuntimeManager adapterRuntimeManager = null;
+        try {
+            if (ingestionRuntime == null) {
+                try {
+                    adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+                    //TODO: Fix record tracking
+                    //                    if (adapterFactory.isRecordTrackingEnabled()) {
+                    //                        tracker = adapterFactory.createIntakeProgressTracker();
+                    //                    }
+                } catch (Exception e) {
+                    LOGGER.severe("Unable to create adapter : " + adapterFactory.getAlias() + "[" + partition + "]"
+                            + " Exception " + e);
+                    throw new HyracksDataException(e);
+                }
+                FrameTupleAccessor fta = new FrameTupleAccessor(recordDesc);
+                feedFrameWriter = new DistributeFeedFrameWriter(ctx, feedId, writer, FeedRuntimeType.INTAKE, partition,
+                        fta, feedManager);
+                adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, tracker, feedFrameWriter, partition);
+                SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
+                        partition);
+                ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
+                        adapterRuntimeManager);
+                feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
+                feedFrameWriter.open();
+            } else {
+                if (ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
+                    ingestionRuntime.getAdapterRuntimeManager().setState(State.ACTIVE_INGESTION);
+                    adapter = ingestionRuntime.getAdapterRuntimeManager().getFeedAdapter();
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info(" Switching to " + State.ACTIVE_INGESTION + " for ingestion runtime "
+                                + ingestionRuntime);
+                        LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
+                                + " connected to backend for feed " + feedId);
+                    }
+                    feedFrameWriter = ingestionRuntime.getFeedFrameWriter();
+                } else {
+                    String message = "Feed Ingestion Runtime for feed " + feedId
+                            + " is already registered and is active!.";
+                    LOGGER.severe(message);
+                    throw new IllegalStateException(message);
+                }
+            }
+
+            waitTillIngestionIsOver(adapterRuntimeManager);
+            feedSubscriptionManager
+                    .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
+            if (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)) {
+                throw new HyracksDataException("Unable to ingest data");
+            }
+
+        } catch (InterruptedException ie) {
+            /*
+             * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another node involved in the Hyracks job.
+             * As the Intake job involves only the intake operator, the exception is indicative of a failure at the sibling intake operator location.
+             * The surviving intake partitions must continue to live and receive data from the external source.
+             */
+            List<ISubscriberRuntime> subscribers = ingestionRuntime.getSubscribers();
+            FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(new HashMap<String, String>());
+            boolean needToHandleFailure = false;
+            List<ISubscriberRuntime> failingSubscribers = new ArrayList<ISubscriberRuntime>();
+            for (ISubscriberRuntime subscriber : subscribers) {
+                policyAccessor.reset(subscriber.getFeedPolicy());
+                if (!policyAccessor.continueOnHardwareFailure()) {
+                    failingSubscribers.add(subscriber);
+                } else {
+                    needToHandleFailure = true;
+                }
+            }
+
+            for (ISubscriberRuntime failingSubscriber : failingSubscribers) {
+                try {
+                    ingestionRuntime.unsubscribeFeed((CollectionRuntime) failingSubscriber);
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning(
+                                "Excpetion in unsubscribing " + failingSubscriber + " message " + e.getMessage());
+                    }
+                }
+            }
+
+            if (needToHandleFailure) {
+                ingestionRuntime.getAdapterRuntimeManager().setState(State.INACTIVE_INGESTION);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Switching to " + State.INACTIVE_INGESTION + " on occurrence of failure.");
+                }
+            } else {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info(
+                            "Interrupted Exception. None of the subscribers need to handle failures. Shutting down feed ingestion");
+                }
+                feedSubscriptionManager
+                        .deregisterFeedSubscribableRuntime((SubscribableFeedRuntimeId) ingestionRuntime.getRuntimeId());
+                throw new HyracksDataException(ie);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        } finally {
+            if (ingestionRuntime != null
+                    && !ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {
+                feedFrameWriter.close();
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Closed Frame Writer " + feedFrameWriter + " adapter state "
+                            + ingestionRuntime.getAdapterRuntimeManager().getState());
+                }
+            } else {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Ending intake operator node pushable in state " + State.INACTIVE_INGESTION
+                            + " Will resume after correcting failure");
+                }
+            }
+
+        }
+    }
+
+    private void waitTillIngestionIsOver(IAdapterRuntimeManager adapterRuntimeManager) throws InterruptedException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Waiting for adaptor [" + partition + "]" + "to be done with ingestion of feed " + feedId);
+        }
+        synchronized (adapterRuntimeManager) {
+            while (!(adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FINISHED_INGESTION)
+                    || (adapterRuntimeManager.getState().equals(IAdapterRuntimeManager.State.FAILED_INGESTION)))) {
+                adapterRuntimeManager.wait();
+            }
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(" Adaptor " + adapter.getClass().getName() + "[" + partition + "]"
+                    + " done with ingestion of feed " + feedId);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
new file mode 100644
index 0000000..219110f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessageOperatorDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import org.apache.asterix.external.feed.api.IFeedMessage;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * Sends a control message to the registered message queue for feed specified by its feedId.
+ */
+public class FeedMessageOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+    private final IFeedMessage feedMessage;
+
+    public FeedMessageOperatorDescriptor(JobSpecification spec, FeedConnectionId connectionId,
+            IFeedMessage feedMessage) {
+        super(spec, 0, 1);
+        this.connectionId = connectionId;
+        this.feedMessage = feedMessage;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new FeedMessageOperatorNodePushable(ctx, connectionId, feedMessage, partition, nPartitions);
+    }
+
+}