You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:10 UTC

[12/21] incubator-asterixdb git commit: First stage of external data cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
new file mode 100644
index 0000000..9864805
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataExceptionUtils;
+
+public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
+
+    private int depth;
+    private boolean prevCharEscape;
+    private boolean inString;
+    private char recordStart;
+    private char recordEnd;
+    private int recordNumber = 0;
+
+    public int getRecordNumber() {
+        return recordNumber;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        super.configure(configuration);
+        String recStartString = configuration.get(ExternalDataConstants.KEY_RECORD_START);
+        String recEndString = configuration.get(ExternalDataConstants.KEY_RECORD_END);
+        if (recStartString != null) {
+            if (recStartString.length() != 1) {
+                throw new AsterixException(
+                        ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
+                                ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
+            }
+            recordStart = recStartString.charAt(0);
+        } else {
+            recordStart = ExternalDataConstants.DEFAULT_RECORD_START;
+        }
+        if (recEndString != null) {
+            if (recEndString.length() != 1) {
+                throw new AsterixException(
+                        ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
+                                ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
+            }
+            recordEnd = recEndString.charAt(0);
+        } else {
+            recordEnd = ExternalDataConstants.DEFAULT_RECORD_END;
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        record.reset();
+        boolean hasStarted = false;
+        boolean hasFinished = false;
+        prevCharEscape = false;
+        inString = false;
+        depth = 0;
+        do {
+            int startPosn = bufferPosn; //starting from where we left off the last time
+            if (bufferPosn >= bufferLength) {
+                startPosn = bufferPosn = 0;
+                bufferLength = reader.read(inputBuffer);
+                if (bufferLength <= 0) {
+                    return false; // EOF
+                }
+            }
+            if (!hasStarted) {
+                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+                    if (inputBuffer[bufferPosn] == recordStart) {
+                        startPosn = bufferPosn;
+                        hasStarted = true;
+                        depth = 1;
+                        ++bufferPosn; // at next invocation proceed from following byte
+                        break;
+                    } else if (inputBuffer[bufferPosn] != ExternalDataConstants.SPACE
+                            && inputBuffer[bufferPosn] != ExternalDataConstants.TAB
+                            && inputBuffer[bufferPosn] != ExternalDataConstants.LF
+                            && inputBuffer[bufferPosn] != ExternalDataConstants.CR) {
+                        // corrupted file. clear the buffer and stop reading
+                        reader.skipError();
+                        bufferPosn = bufferLength = 0;
+                        throw new IOException("Malformed input stream");
+                    }
+                }
+            }
+            if (hasStarted) {
+                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
+                    if (inString) {
+                        // we are in a string, we only care about the string end
+                        if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
+                            inString = false;
+                        }
+                        if (prevCharEscape) {
+                            prevCharEscape = false;
+                        } else {
+                            prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+                        }
+                    } else {
+                        if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE) {
+                            inString = true;
+                        } else if (inputBuffer[bufferPosn] == recordStart) {
+                            depth += 1;
+                        } else if (inputBuffer[bufferPosn] == recordEnd) {
+                            depth -= 1;
+                            if (depth == 0) {
+                                hasFinished = true;
+                                ++bufferPosn; // at next invocation proceed from following byte
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+
+            int appendLength = bufferPosn - startPosn;
+            if (appendLength > 0) {
+                record.append(inputBuffer, startPosn, appendLength);
+            }
+        } while (!hasFinished);
+        record.endRecord();
+        recordNumber++;
+        return true;
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
new file mode 100644
index 0000000..c294ccb
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+
+public class SequenceLookupReader extends AbstractCharRecordLookupReader {
+
+    public SequenceLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+        super(snapshotAccessor, fs, conf);
+    }
+
+    private static final Logger LOGGER = Logger.getLogger(SequenceLookupReader.class.getName());
+    private Reader reader;
+    private Writable key;
+
+    @Override
+    protected void readRecord(RecordId rid) throws IOException {
+        reader.seek(rid.getOffset());
+        reader.next(key, value);
+    }
+
+    @Override
+    protected void closeFile() {
+        if (reader == null) {
+            return;
+        }
+        try {
+            reader.close();
+        } catch (Exception e) {
+            LOGGER.warn("Error closing HDFS file ", e);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    protected void openFile() throws IllegalArgumentException, IOException {
+        reader = new SequenceFile.Reader(fs, new Path(file.getFileName()), conf);
+        key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
new file mode 100644
index 0000000..b276bfa
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class TextLookupReader extends AbstractCharRecordLookupReader {
+
+    public TextLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
+        super(snapshotAccessor, fs, conf);
+    }
+
+    private static final Logger LOGGER = Logger.getLogger(TextLookupReader.class.getName());
+    private HDFSTextLineReader reader;
+
+    @Override
+    protected void readRecord(RecordId rid) throws IOException {
+        reader.seek(rid.getOffset());
+        reader.readLine(value);
+    }
+
+    @Override
+    protected void closeFile() {
+        if (reader == null) {
+            return;
+        }
+        try {
+            reader.close();
+        } catch (Exception e) {
+            LOGGER.warn("Error closing HDFS file ", e);
+        }
+    }
+
+    @Override
+    protected void openFile() throws IllegalArgumentException, IOException {
+        if (reader == null) {
+            reader = new HDFSTextLineReader();
+        }
+        reader.resetReader(fs.open(new Path(file.getFileName())));;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
new file mode 100644
index 0000000..34d8122
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.TwitterUtil;
+import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import twitter4j.Query;
+import twitter4j.QueryResult;
+import twitter4j.Status;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+
+public class TwitterPullRecordReader implements IRecordReader<Status> {
+
+    private String keywords;
+    private Query query;
+    private Twitter twitter;
+    private int requestInterval = 5; // seconds
+    private QueryResult result;
+    private int nextTweetIndex = 0;
+    private long lastTweetIdReceived = 0;
+    private GenericRecord<Status> record;
+
+    public TwitterPullRecordReader() {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        twitter = TwitterUtil.getTwitterService(configuration);
+        keywords = configuration.get(SearchAPIConstants.QUERY);
+        requestInterval = Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL));
+        query = new Query(keywords);
+        query.setCount(100);
+        record = new GenericRecord<Status>();
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return true;
+    }
+
+    @Override
+    public IRawRecord<Status> next() throws IOException, InterruptedException {
+        if (result == null || nextTweetIndex >= result.getTweets().size()) {
+            Thread.sleep(1000 * requestInterval);
+            query.setSinceId(lastTweetIdReceived);
+            try {
+                result = twitter.search(query);
+            } catch (TwitterException e) {
+                throw new HyracksDataException(e);
+            }
+            nextTweetIndex = 0;
+        }
+        if (result != null && !result.getTweets().isEmpty()) {
+            List<Status> tw = result.getTweets();
+            Status tweet = tw.get(nextTweetIndex++);
+            if (lastTweetIdReceived < tweet.getId()) {
+                lastTweetIdReceived = tweet.getId();
+            }
+            record.set(tweet);
+            return record;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Class<Status> getRecordClass() throws IOException {
+        return Status.class;
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
new file mode 100644
index 0000000..e7c141d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.TwitterUtil;
+
+import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+
+public class TwitterPushRecordReader implements IRecordReader<Status> {
+    private LinkedBlockingQueue<Status> inputQ;
+    private TwitterStream twitterStream;
+    private GenericRecord<Status> record;
+
+    @Override
+    public void close() throws IOException {
+        twitterStream.clearListeners();
+        twitterStream.cleanUp();
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        record = new GenericRecord<Status>();
+        inputQ = new LinkedBlockingQueue<Status>();
+        twitterStream = TwitterUtil.getTwitterStream(configuration);
+        twitterStream.addListener(new TweetListener(inputQ));
+        FilterQuery query = TwitterUtil.getFilterQuery(configuration);
+        if (query != null) {
+            twitterStream.filter(query);
+        } else {
+            twitterStream.sample();
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return true;
+    }
+
+    @Override
+    public IRawRecord<Status> next() throws IOException, InterruptedException {
+        Status tweet = inputQ.poll();
+        if (tweet == null) {
+            return null;
+        }
+        record.set(tweet);
+        return record;
+    }
+
+    @Override
+    public Class<? extends Status> getRecordClass() throws IOException {
+        return Status.class;
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+    private class TweetListener implements StatusListener {
+
+        private LinkedBlockingQueue<Status> inputQ;
+
+        public TweetListener(LinkedBlockingQueue<Status> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onStatus(Status tweet) {
+            inputQ.add(tweet);
+        }
+
+        @Override
+        public void onException(Exception arg0) {
+
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice arg0) {
+        }
+
+        @Override
+        public void onScrubGeo(long arg0, long arg1) {
+        }
+
+        @Override
+        public void onStallWarning(StallWarning arg0) {
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int arg0) {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
new file mode 100644
index 0000000..e9fad25
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.ILookupReaderFactory;
+import org.apache.asterix.external.api.ILookupRecordReader;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.input.record.reader.RCLookupReader;
+import org.apache.asterix.external.input.record.reader.SequenceLookupReader;
+import org.apache.asterix.external.input.record.reader.TextLookupReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+
+public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
+
+    protected static final long serialVersionUID = 1L;
+    protected transient AlgebricksPartitionConstraint clusterLocations;
+    protected ConfFactory confFactory;
+    protected Map<String, String> configuration;
+
+    public HDFSLookupReaderFactory() {
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
+        return clusterLocations;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
+        confFactory = new ConfFactory(conf);
+
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ILookupRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition,
+            ExternalFileIndexAccessor snapshotAccessor) throws Exception {
+        String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+        JobConf conf = confFactory.getConf();
+        FileSystem fs = FileSystem.get(conf);
+        switch (inputFormatParameter) {
+            case ExternalDataConstants.INPUT_FORMAT_TEXT:
+                return (ILookupRecordReader<? extends T>) new TextLookupReader(snapshotAccessor, fs, conf);
+            case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
+                return (ILookupRecordReader<? extends T>) new SequenceLookupReader(snapshotAccessor, fs, conf);
+            case ExternalDataConstants.INPUT_FORMAT_RC:
+                return (ILookupRecordReader<? extends T>) new RCLookupReader(snapshotAccessor, fs, conf);
+            default:
+                throw new AsterixException("Unrecognised input format: " + inputFormatParameter);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
new file mode 100644
index 0000000..05d419d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.LineRecordReader;
+import org.apache.asterix.external.input.record.reader.QuotedLineRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+        String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
+        LineRecordReader recordReader;
+        if (quoteString != null) {
+            recordReader = new QuotedLineRecordReader();
+        } else {
+            recordReader = new LineRecordReader();
+        }
+        return configureReader(recordReader, ctx, partition);
+    }
+
+    @Override
+    public Class<? extends char[]> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
new file mode 100644
index 0000000..a672f2f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.factory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.RSSRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+
+public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntryImpl> {
+
+    private static final long serialVersionUID = 1L;
+    private Map<String, String> configuration;
+    private List<String> urls = new ArrayList<String>();
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(urls.size());
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        String url = configuration.get(ExternalDataConstants.KEY_RSS_URL);
+        if (url == null) {
+            throw new IllegalArgumentException("no RSS URL provided");
+        }
+        initializeURLs(url);
+    }
+
+    private void initializeURLs(String url) {
+        urls.clear();
+        String[] rssURLs = url.split(",");
+        for (String rssURL : rssURLs) {
+            urls.add(rssURL);
+        }
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @Override
+    public IRecordReader<? extends SyndEntryImpl> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws Exception {
+        RSSRecordReader reader = new RSSRecordReader(urls.get(partition));
+        reader.configure(configuration);
+        return reader;
+    }
+
+    @Override
+    public Class<? extends SyndEntryImpl> getRecordClass() {
+        return SyndEntryImpl.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
new file mode 100644
index 0000000..91b439c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.SemiStructuredRecordReader;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+        SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
+        return configureReader(recordReader, ctx, partition);
+    }
+
+    @Override
+    public Class<? extends char[]> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
new file mode 100644
index 0000000..72aaa37
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.factory;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.TwitterPullRecordReader;
+import org.apache.asterix.external.input.record.reader.TwitterPushRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.TwitterUtil;
+import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
+import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import twitter4j.Status;
+
+public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName());
+
+    private static final String DEFAULT_INTERVAL = "10"; // 10 seconds
+    private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
+
+    private Map<String, String> configuration;
+    private boolean pull;
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+        if (!validateConfiguration(configuration)) {
+            StringBuilder builder = new StringBuilder();
+            builder.append("One or more parameters are missing from adapter configuration\n");
+            builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
+            builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
+            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
+            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
+            throw new Exception(builder.toString());
+        }
+        if (ExternalDataUtils.isPull(configuration)) {
+            pull = true;
+            if (configuration.get(SearchAPIConstants.QUERY) == null) {
+                throw new AsterixException(
+                        "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
+            }
+            String interval = configuration.get(SearchAPIConstants.INTERVAL);
+            if (interval != null) {
+                try {
+                    Integer.parseInt(interval);
+                } catch (NumberFormatException nfe) {
+                    throw new IllegalArgumentException(
+                            "parameter " + SearchAPIConstants.INTERVAL + " is defined incorrectly, expecting a number");
+                }
+            } else {
+                configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
+                            + DEFAULT_INTERVAL + ")");
+                }
+            }
+        } else if (ExternalDataUtils.isPush(configuration)) {
+            pull = false;
+        } else {
+            throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
+                    + ExternalDataConstants.KEY_PUSH + "must be specified as part of adaptor configuration");
+        }
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @Override
+    public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
+        IRecordReader<Status> reader;
+        if (pull) {
+            reader = new TwitterPullRecordReader();
+        } else {
+            reader = new TwitterPushRecordReader();
+        }
+        reader.configure(configuration);
+        return reader;
+    }
+
+    @Override
+    public Class<? extends Status> getRecordClass() {
+        return Status.class;
+    }
+
+    private boolean validateConfiguration(Map<String, String> configuration) {
+        String consumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+        String consumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+        String accessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+        String tokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+        if (consumerKey == null || consumerSecret == null || accessToken == null || tokenSecret == null) {
+            return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
new file mode 100644
index 0000000..73f6195
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.InputStream;
+
+public abstract class AInputStream extends InputStream {
+    public abstract boolean skipError() throws Exception;
+
+    public abstract boolean stop() throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
new file mode 100644
index 0000000..e573f74
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.InputStreamReader;
+
+public class AInputStreamReader extends InputStreamReader {
+    private AInputStream in;
+
+    public AInputStreamReader(AInputStream in) {
+        super(in);
+        this.in = in;
+    }
+
+    public boolean skipError() throws Exception {
+        return in.skipError();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
new file mode 100644
index 0000000..aa7a3d8
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/BasicInputStream.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class BasicInputStream extends AInputStream {
+    private final InputStream in;
+
+    public BasicInputStream(InputStream in) {
+        this.in = in;
+    }
+
+    @Override
+    public int read() throws IOException {
+        return in.read();
+    }
+
+    @Override
+    public int read(byte b[]) throws IOException {
+        return in.read(b);
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+        return in.read(b, off, len);
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        return in.skip(n);
+
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        in.mark(readlimit);
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        in.reset();
+    }
+
+    @Override
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+
+    @Override
+    public boolean skipError() {
+        return false;
+    }
+
+    @Override
+    public boolean stop() throws Exception {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
new file mode 100644
index 0000000..b3ad1c3
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.provider.ExternalIndexerProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+public class HDFSInputStreamProvider<K> extends HDFSRecordReader<K, Text> implements IInputStreamProvider {
+
+    public HDFSInputStreamProvider(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
+            JobConf conf, Map<String, String> configuration, List<ExternalFile> snapshot) throws Exception {
+        super(read, inputSplits, readSchedule, nodeName, conf);
+        value = new Text();
+        configure(configuration);
+        if (snapshot != null) {
+            setSnapshot(snapshot);
+            setIndexer(ExternalIndexerProvider.getIndexer(configuration));
+            if (currentSplitIndex < snapshot.size()) {
+                indexer.reset(this);
+            }
+        }
+    }
+
+    @Override
+    public AInputStream getInputStream() throws Exception {
+        return new HDFSInputStream();
+    }
+
+    private class HDFSInputStream extends AInputStream {
+        int pos = 0;
+
+        @Override
+        public int read() throws IOException {
+            if (value.getLength() < pos) {
+                if (!readMore()) {
+                    return -1;
+                }
+            } else if (value.getLength() == pos) {
+                pos++;
+                return ExternalDataConstants.EOL;
+            }
+            return value.getBytes()[pos++];
+        }
+
+        private int readRecord(byte[] buffer, int offset, int len) {
+            int actualLength = value.getLength() + 1;
+            if ((actualLength - pos) > len) {
+                //copy partial record
+                System.arraycopy(value.getBytes(), pos, buffer, offset, len);
+                pos += len;
+                return len;
+            } else {
+                int numBytes = value.getLength() - pos;
+                System.arraycopy(value.getBytes(), pos, buffer, offset, numBytes);
+                buffer[offset + numBytes] = ExternalDataConstants.LF;
+                pos += numBytes;
+                numBytes++;
+                return numBytes;
+            }
+        }
+
+        @Override
+        public int read(byte[] buffer, int offset, int len) throws IOException {
+            if (value.getLength() > pos) {
+                return readRecord(buffer, offset, len);
+            }
+            if (!readMore()) {
+                return -1;
+            }
+            return readRecord(buffer, offset, len);
+        }
+
+        private boolean readMore() throws IOException {
+            try {
+                pos = 0;
+                return HDFSInputStreamProvider.this.hasNext();
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+
+        @Override
+        public boolean skipError() throws Exception {
+            return true;
+        }
+
+        @Override
+        public boolean stop() throws Exception {
+            return false;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
new file mode 100644
index 0000000..b511617
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class LocalFSInputStreamProvider implements IInputStreamProvider {
+
+    private FileSplit[] fileSplits;
+    private int partition;
+
+    public LocalFSInputStreamProvider(FileSplit[] fileSplits, IHyracksTaskContext ctx,
+            Map<String, String> configuration, int partition) {
+        this.partition = partition;
+        this.fileSplits = fileSplits;
+    }
+
+    @Override
+    public AInputStream getInputStream() throws Exception {
+        FileSplit split = fileSplits[partition];
+        File inputFile = split.getLocalFile().getFile();
+        InputStream in;
+        try {
+            in = new FileInputStream(inputFile);
+            return new BasicInputStream(in);
+        } catch (FileNotFoundException e) {
+            throw new IOException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
new file mode 100644
index 0000000..2253a73
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class SocketInputStream extends AInputStream {
+    private ServerSocket server;
+    private Socket socket;
+    private InputStream connectionStream;
+
+    public SocketInputStream(ServerSocket server) throws IOException {
+        this.server = server;
+        socket = server.accept();
+        connectionStream = socket.getInputStream();
+    }
+
+    @Override
+    public int read() throws IOException {
+        int read = connectionStream.read();
+        while (read < 0) {
+            accept();
+            read = connectionStream.read();
+        }
+        return read;
+    }
+
+    @Override
+    public boolean skipError() throws Exception {
+        accept();
+        return true;
+    }
+
+    @Override
+    public int read(byte b[]) throws IOException {
+        int read = connectionStream.read(b, 0, b.length);
+        while (read < 0) {
+            accept();
+            read = connectionStream.read(b, 0, b.length);
+        }
+        return read;
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+        int read = connectionStream.read(b, off, len);
+        while (read < 0) {
+            accept();
+            read = connectionStream.read(b, off, len);
+        }
+        return read;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return 1;
+    }
+
+    @Override
+    public void close() throws IOException {
+        connectionStream.close();
+        socket.close();
+        server.close();
+    }
+
+    private void accept() throws IOException {
+        connectionStream.close();
+        socket.close();
+        socket = server.accept();
+        connectionStream = socket.getInputStream();
+    }
+
+    @Override
+    public boolean stop() throws Exception {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
new file mode 100644
index 0000000..1f920e9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.net.ServerSocket;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+
+public class SocketInputStreamProvider implements IInputStreamProvider {
+    private ServerSocket server;
+
+    public SocketInputStreamProvider(ServerSocket server) {
+        this.server = server;
+    }
+
+    @Override
+    public AInputStream getInputStream() throws Exception {
+        return new SocketInputStream(server);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
new file mode 100644
index 0000000..d32a94f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.runtime.TweetGenerator;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider {
+
+    private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStreamProvider.class.getName());
+
+    private ExecutorService executorService;
+
+    private PipedOutputStream outputStream;
+
+    private PipedInputStream inputStream;
+
+    private TwitterServer twitterServer;
+
+    public TwitterFirehoseInputStreamProvider(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
+            throws Exception {
+        executorService = Executors.newCachedThreadPool();
+        outputStream = new PipedOutputStream();
+        inputStream = new PipedInputStream(outputStream);
+        twitterServer = new TwitterServer(configuration, partition, outputStream, executorService, inputStream);
+    }
+
+    @Override
+    public AInputStream getInputStream() throws Exception {
+        twitterServer.start();
+        return twitterServer;
+    }
+
+    private static class TwitterServer extends AInputStream {
+        private final DataProvider dataProvider;
+        private final ExecutorService executorService;
+        private InputStream in;
+        private boolean started;
+
+        public TwitterServer(Map<String, String> configuration, int partition, OutputStream os,
+                ExecutorService executorService, InputStream in) throws Exception {
+            dataProvider = new DataProvider(configuration, partition, os);
+            this.executorService = executorService;
+            this.in = in;
+            this.started = false;
+        }
+
+        @Override
+        public boolean stop() throws IOException {
+            dataProvider.stop();
+            return true;
+        }
+
+        public void start() {
+            executorService.execute(dataProvider);
+        }
+
+        @Override
+        public boolean skipError() throws Exception {
+            return false;
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (!started) {
+                start();
+                started = true;
+            }
+            return in.read();
+        }
+
+        @Override
+        public int read(byte b[], int off, int len) throws IOException {
+            if (!started) {
+                start();
+                started = true;
+            }
+            return in.read(b, off, len);
+        }
+    }
+
+    private static class DataProvider implements Runnable {
+
+        public static final String KEY_MODE = "mode";
+
+        private TweetGenerator tweetGenerator;
+        private boolean continuePush = true;
+        private int batchSize;
+        private final Mode mode;
+        private final OutputStream os;
+
+        public static enum Mode {
+            AGGRESSIVE,
+            CONTROLLED
+        }
+
+        public DataProvider(Map<String, String> configuration, int partition, OutputStream os) throws Exception {
+            this.tweetGenerator = new TweetGenerator(configuration, partition);
+            this.tweetGenerator.registerSubscriber(os);
+            this.os = os;
+            mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
+                    : Mode.AGGRESSIVE;
+            switch (mode) {
+                case CONTROLLED:
+                    String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
+                    if (tpsValue == null) {
+                        throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
+                    }
+                    batchSize = Integer.parseInt(tpsValue);
+                    break;
+                case AGGRESSIVE:
+                    batchSize = 5000;
+                    break;
+            }
+        }
+
+        @Override
+        public void run() {
+            boolean moreData = true;
+            long startBatch;
+            long endBatch;
+
+            while (true) {
+                try {
+                    while (moreData && continuePush) {
+                        switch (mode) {
+                            case AGGRESSIVE:
+                                moreData = tweetGenerator.generateNextBatch(batchSize);
+                                break;
+                            case CONTROLLED:
+                                startBatch = System.currentTimeMillis();
+                                moreData = tweetGenerator.generateNextBatch(batchSize);
+                                endBatch = System.currentTimeMillis();
+                                if (endBatch - startBatch < 1000) {
+                                    Thread.sleep(1000 - (endBatch - startBatch));
+                                }
+                                break;
+                        }
+                    }
+                    os.close();
+                    break;
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Exception in adaptor " + e.getMessage());
+                    }
+                }
+            }
+        }
+
+        public void stop() {
+            continuePush = false;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
new file mode 100644
index 0000000..14c712a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.File;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.INodeResolver;
+import org.apache.asterix.external.api.INodeResolverFactory;
+import org.apache.asterix.external.input.stream.LocalFSInputStreamProvider;
+import org.apache.asterix.external.util.DNSResolverFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+
+public class LocalFSInputStreamProviderFactory implements IInputStreamProviderFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    protected static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver();
+    protected static final Logger LOGGER = Logger.getLogger(LocalFSInputStreamProviderFactory.class.getName());
+    protected static INodeResolver nodeResolver;
+    protected Map<String, String> configuration;
+    protected FileSplit[] fileSplits;
+
+    @Override
+    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception {
+        return new LocalFSInputStreamProvider(fileSplits, ctx, configuration, partition);
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.STREAM;
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
+        configureFileSplits(splits);
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return configurePartitionConstraint();
+    }
+
+    private void configureFileSplits(String[] splits) throws AsterixException {
+        if (fileSplits == null) {
+            fileSplits = new FileSplit[splits.length];
+            String nodeName;
+            String nodeLocalPath;
+            int count = 0;
+            String trimmedValue;
+            for (String splitPath : splits) {
+                trimmedValue = splitPath.trim();
+                if (!trimmedValue.contains("://")) {
+                    throw new AsterixException(
+                            "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
+                }
+                nodeName = trimmedValue.split(":")[0];
+                nodeLocalPath = trimmedValue.split("://")[1];
+                FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
+                fileSplits[count++] = fileSplit;
+            }
+        }
+    }
+
+    private AlgebricksPartitionConstraint configurePartitionConstraint() throws AsterixException {
+        String[] locs = new String[fileSplits.length];
+        String location;
+        for (int i = 0; i < fileSplits.length; i++) {
+            location = getNodeResolver().resolveNode(fileSplits[i].getNodeName());
+            locs[i] = location;
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locs);
+    }
+
+    protected INodeResolver getNodeResolver() {
+        if (nodeResolver == null) {
+            synchronized (DEFAULT_NODE_RESOLVER) {
+                if (nodeResolver == null) {
+                    nodeResolver = initializeNodeResolver();
+                }
+            }
+        }
+        return nodeResolver;
+    }
+
+    private static INodeResolver initializeNodeResolver() {
+        INodeResolver nodeResolver = null;
+        String configuredNodeResolverFactory = System.getProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY);
+        if (configuredNodeResolverFactory != null) {
+            try {
+                nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
+                        .createNodeResolver();
+
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname "
+                            + configuredNodeResolverFactory + "\n" + e.getMessage());
+                }
+                nodeResolver = DEFAULT_NODE_RESOLVER;
+            }
+        } else {
+            nodeResolver = DEFAULT_NODE_RESOLVER;
+        }
+        return nodeResolver;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
new file mode 100644
index 0000000..37afa53
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.input.stream.SocketInputStreamProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.util.AsterixRuntimeUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class SocketInputStreamProviderFactory implements IInputStreamProviderFactory {
+
+    private static final long serialVersionUID = 1L;
+    private List<Pair<String, Integer>> sockets;
+    private Mode mode = Mode.IP;
+
+    public static enum Mode {
+        NC,
+        IP
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        sockets = new ArrayList<Pair<String, Integer>>();
+        String modeValue = configuration.get(ExternalDataConstants.KEY_MODE);
+        if (modeValue != null) {
+            mode = Mode.valueOf(modeValue.trim().toUpperCase());
+        }
+        String socketsValue = configuration.get(ExternalDataConstants.KEY_SOCKETS);
+        if (socketsValue == null) {
+            throw new IllegalArgumentException("\'sockets\' parameter not specified as part of adapter configuration");
+        }
+        Map<InetAddress, Set<String>> ncMap = AsterixRuntimeUtil.getNodeControllerMap();
+        List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
+        String[] socketsArray = socketsValue.split(",");
+        Random random = new Random();
+        for (String socket : socketsArray) {
+            String[] socketTokens = socket.split(":");
+            String host = socketTokens[0].trim();
+            int port = Integer.parseInt(socketTokens[1].trim());
+            Pair<String, Integer> p = null;
+            switch (mode) {
+                case IP:
+                    Set<String> ncsOnIp = ncMap.get(InetAddress.getByName(host));
+                    if (ncsOnIp == null || ncsOnIp.isEmpty()) {
+                        throw new IllegalArgumentException("Invalid host " + host
+                                + " as it is not part of the AsterixDB cluster. Valid choices are "
+                                + StringUtils.join(ncMap.keySet(), ", "));
+                    }
+                    String[] ncArray = ncsOnIp.toArray(new String[] {});
+                    String nc = ncArray[random.nextInt(ncArray.length)];
+                    p = new Pair<String, Integer>(nc, port);
+                    break;
+
+                case NC:
+                    p = new Pair<String, Integer>(host, port);
+                    if (!ncs.contains(host)) {
+                        throw new IllegalArgumentException(
+                                "Invalid NC " + host + " as it is not part of the AsterixDB cluster. Valid choices are "
+                                        + StringUtils.join(ncs, ", "));
+
+                    }
+                    break;
+            }
+            sockets.add(p);
+        }
+    }
+
+    @Override
+    public synchronized IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+            throws IOException, AsterixException {
+        Pair<String, Integer> socket = sockets.get(partition);
+        ServerSocket server = new ServerSocket(socket.second);
+        return new SocketInputStreamProvider(server);
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() {
+        List<String> locations = new ArrayList<String>();
+        for (Pair<String, Integer> socket : sockets) {
+            locations.add(socket.first);
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
+    }
+
+    public List<Pair<String, Integer>> getSockets() {
+        return sockets;
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.STREAM;
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
new file mode 100644
index 0000000..b86c294
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamProviderFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
+ * simulates a twitter firehose with tweets being "pushed" into Asterix at a
+ * configurable rate measured in terms of TPS (tweets/second). The stream of
+ * tweets lasts for a configurable duration (measured in seconds).
+ */
+public class TwitterFirehoseStreamProviderFactory implements IInputStreamProviderFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Degree of parallelism for feed ingestion activity. Defaults to 1. This
+     * determines the count constraint for the ingestion operator.
+     **/
+    private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
+
+    /**
+     * The absolute locations where ingestion operator instances will be placed.
+     **/
+    private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
+
+    private Map<String, String> configuration;
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
+        String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
+        String[] locations = null;
+        if (ingestionLocationParam != null) {
+            locations = ingestionLocationParam.split(",");
+        }
+        int count = locations != null ? locations.length : 1;
+        if (ingestionCardinalityParam != null) {
+            count = Integer.parseInt(ingestionCardinalityParam);
+        }
+
+        List<String> chosenLocations = new ArrayList<String>();
+        String[] availableLocations = locations != null ? locations
+                : AsterixClusterProperties.INSTANCE.getParticipantNodes().toArray(new String[] {});
+        for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
+            chosenLocations.add(availableLocations[k]);
+        }
+        return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.STREAM;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @Override
+    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception {
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
index 14f831b..e9c15cb 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
@@ -21,6 +21,9 @@ package org.apache.asterix.external.library;
 import java.io.IOException;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalFunction;
+import org.apache.asterix.external.api.IFunctionFactory;
+import org.apache.asterix.external.api.IFunctionHelper;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.om.functions.IExternalFunctionInfo;
 import org.apache.asterix.om.types.ATypeTag;