You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/30 00:52:17 UTC

[4/5] incubator-asterixdb git commit: Add flush() to IFrameWriter

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
deleted file mode 100644
index 3a82a68..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.ILookupReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.HDFSLookupReaderFactory;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.HDFSUtils;
-
-public class LookupReaderFactoryProvider {
-
-    @SuppressWarnings("rawtypes")
-    public static ILookupReaderFactory getLookupReaderFactory(Map<String, String> configuration) throws Exception {
-        String inputFormat = HDFSUtils.getInputFormatClassName(configuration);
-        if (inputFormat.equals(ExternalDataConstants.CLASS_NAME_TEXT_INPUT_FORMAT)
-                || inputFormat.equals(ExternalDataConstants.CLASS_NAME_SEQUENCE_INPUT_FORMAT)
-                || inputFormat.equals(ExternalDataConstants.CLASS_NAME_RC_INPUT_FORMAT)) {
-            HDFSLookupReaderFactory<Object> readerFactory = new HDFSLookupReaderFactory<Object>();
-            readerFactory.configure(configuration);
-            return readerFactory;
-        } else {
-            throw new AsterixException("Unrecognized external format");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
deleted file mode 100644
index 49e67e9..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
-
-public class QuotedLineRecordReader extends LineRecordReader {
-
-    private char quote;
-    private boolean prevCharEscape;
-    private boolean inQuote;
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        super.configure(configuration);
-        String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
-        if (quoteString == null || quoteString.length() != 1) {
-            throw new AsterixException(ExternalDataExceptionUtils.incorrectParameterMessage(
-                    ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
-        }
-        this.quote = quoteString.charAt(0);
-    }
-
-    @Override
-    public boolean hasNext() throws IOException {
-        if (done) {
-            return false;
-        }
-        newlineLength = 0;
-        prevCharCR = false;
-        prevCharEscape = false;
-        record.reset();
-        int readLength = 0;
-        inQuote = false;
-        do {
-            int startPosn = bufferPosn;
-            if (bufferPosn >= bufferLength) {
-                startPosn = bufferPosn = 0;
-                bufferLength = reader.read(inputBuffer);
-                if (bufferLength <= 0) {
-                    {
-                        if (readLength > 0) {
-                            if (inQuote) {
-                                throw new IOException("malformed input record ended inside quote");
-                            }
-                            record.endRecord();
-                            recordNumber++;
-                            return true;
-                        }
-                        close();
-                        return false;
-                    }
-                }
-            }
-            for (; bufferPosn < bufferLength; ++bufferPosn) {
-                if (!inQuote) {
-                    if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
-                        newlineLength = (prevCharCR) ? 2 : 1;
-                        ++bufferPosn;
-                        break;
-                    }
-                    if (prevCharCR) {
-                        newlineLength = 1;
-                        break;
-                    }
-                    prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
-                    if (inputBuffer[bufferPosn] == quote) {
-                        if (!prevCharEscape) {
-                            inQuote = true;
-                        }
-                    }
-                    if (prevCharEscape) {
-                        prevCharEscape = false;
-                    } else {
-                        prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
-                    }
-                } else {
-                    // only look for next quote
-                    if (inputBuffer[bufferPosn] == quote) {
-                        if (!prevCharEscape) {
-                            inQuote = false;
-                        }
-                    }
-                    prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
-                }
-            }
-            readLength = bufferPosn - startPosn;
-            if (prevCharCR && newlineLength == 0) {
-                --readLength;
-            }
-            if (readLength > 0) {
-                record.append(inputBuffer, startPosn, readLength);
-            }
-        } while (newlineLength == 0);
-        recordNumber++;
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
deleted file mode 100644
index 5c33502..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.RecordId;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.log4j.Logger;
-
-public class RCLookupReader extends AbstractHDFSLookupRecordReader<BytesRefArrayWritable> {
-    public RCLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
-        super(snapshotAccessor, fs, conf);
-    }
-
-    private static final Logger LOGGER = Logger.getLogger(RCLookupReader.class.getName());
-    private Reader reader;
-    private LongWritable key = new LongWritable();
-    private BytesRefArrayWritable value = new BytesRefArrayWritable();
-    private GenericRecord<BytesRefArrayWritable> record = new GenericRecord<BytesRefArrayWritable>();
-    private long offset;
-    private int row;
-
-    @Override
-    public Class<?> getRecordClass() throws IOException {
-        return Writable.class;
-    }
-
-    @Override
-    protected IRawRecord<BytesRefArrayWritable> lookup(RecordId rid) throws IOException {
-        if (rid.getOffset() != offset) {
-            offset = rid.getOffset();
-            if (reader.getPosition() != offset)
-                reader.seek(offset);
-            reader.resetBuffer();
-            row = -1;
-        }
-
-        // skip rows to the record row
-        while (row < rid.getRow()) {
-            reader.next(key);
-            reader.getCurrentRow(value);
-            row++;
-        }
-        record.set(value);
-        return record;
-    }
-
-    @Override
-    protected void closeFile() {
-        if (reader == null) {
-            return;
-        }
-        try {
-            reader.close();
-        } catch (Exception e) {
-            LOGGER.warn("Error closing HDFS file", e);
-        }
-    }
-
-    @Override
-    protected void openFile() throws IllegalArgumentException, IOException {
-        reader = new Reader(fs, new Path(file.getFileName()), conf);
-        offset = -1;
-        row = -1;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
deleted file mode 100644
index 1c2dc30..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.log4j.Logger;
-
-import com.sun.syndication.feed.synd.SyndEntryImpl;
-import com.sun.syndication.feed.synd.SyndFeed;
-import com.sun.syndication.fetcher.FeedFetcher;
-import com.sun.syndication.fetcher.FetcherEvent;
-import com.sun.syndication.fetcher.FetcherException;
-import com.sun.syndication.fetcher.FetcherListener;
-import com.sun.syndication.fetcher.impl.FeedFetcherCache;
-import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
-import com.sun.syndication.fetcher.impl.HttpURLFeedFetcher;
-import com.sun.syndication.io.FeedException;
-
-public class RSSRecordReader implements IRecordReader<SyndEntryImpl> {
-
-    private static final Logger LOGGER = Logger.getLogger(RSSRecordReader.class.getName());
-    private boolean modified = false;
-    private Queue<SyndEntryImpl> rssFeedBuffer = new LinkedList<SyndEntryImpl>();
-    private FeedFetcherCache feedInfoCache;
-    private FeedFetcher fetcher;
-    private FetcherEventListenerImpl listener;
-    private URL feedUrl;
-    private GenericRecord<SyndEntryImpl> record = new GenericRecord<SyndEntryImpl>();
-    private boolean done = false;
-
-    public RSSRecordReader(String url) throws MalformedURLException {
-        feedUrl = new URL(url);
-        feedInfoCache = HashMapFeedInfoCache.getInstance();
-        fetcher = new HttpURLFeedFetcher(feedInfoCache);
-        listener = new FetcherEventListenerImpl(this, LOGGER);
-        fetcher.addFetcherEventListener(listener);
-    }
-
-    public boolean isModified() {
-        return modified;
-    }
-
-    @Override
-    public void close() throws IOException {
-        fetcher.removeFetcherEventListener(listener);
-    }
-
-    @Override
-    public void configure(Map<String, String> configurations) throws Exception {
-    }
-
-    @Override
-    public boolean hasNext() throws Exception {
-        return !done;
-    }
-
-    @Override
-    public IRawRecord<SyndEntryImpl> next() throws IOException {
-        if (done) {
-            return null;
-        }
-        try {
-            SyndEntryImpl feedEntry;
-            feedEntry = getNextRSSFeed();
-            if (feedEntry == null) {
-                return null;
-            }
-            record.set(feedEntry);
-            return record;
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    public Class<SyndEntryImpl> getRecordClass() throws IOException {
-        return SyndEntryImpl.class;
-    }
-
-    @Override
-    public boolean stop() {
-        done = true;
-        return true;
-    }
-
-    public void setModified(boolean modified) {
-        this.modified = modified;
-    }
-
-    private SyndEntryImpl getNextRSSFeed() throws Exception {
-        if (rssFeedBuffer.isEmpty()) {
-            fetchFeed();
-        }
-        if (rssFeedBuffer.isEmpty()) {
-            return null;
-        } else {
-            return rssFeedBuffer.remove();
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private void fetchFeed() throws IllegalArgumentException, IOException, FeedException, FetcherException {
-        // Retrieve the feed.
-        // We will get a Feed Polled Event and then a
-        // Feed Retrieved event (assuming the feed is valid)
-        SyndFeed feed = fetcher.retrieveFeed(feedUrl);
-        if (modified) {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info(feedUrl + " retrieved");
-                LOGGER.info(feedUrl + " has a title: " + feed.getTitle() + " and contains " + feed.getEntries().size()
-                        + " entries.");
-            }
-            List<? extends SyndEntryImpl> fetchedFeeds = feed.getEntries();
-            rssFeedBuffer.addAll(fetchedFeeds);
-        }
-    }
-}
-
-class FetcherEventListenerImpl implements FetcherListener {
-
-    private RSSRecordReader reader;
-    private Logger LOGGER;
-
-    public FetcherEventListenerImpl(RSSRecordReader reader, Logger LOGGER) {
-        this.reader = reader;
-        this.LOGGER = LOGGER;
-    }
-
-    /**
-     * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
-     */
-    @Override
-    public void fetcherEvent(FetcherEvent event) {
-        String eventType = event.getEventType();
-        if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("\tEVENT: Feed Polled. URL = " + event.getUrlString());
-            }
-        } else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("\tEVENT: Feed Retrieved. URL = " + event.getUrlString());
-            }
-            (reader).setModified(true);
-        } else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("\tEVENT: Feed Unchanged. URL = " + event.getUrlString());
-            }
-            (reader).setModified(true);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
deleted file mode 100644
index 84c96d0..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
-
-public class SemiStructuredRecordReader extends AbstractStreamRecordReader {
-
-    private int depth;
-    private boolean prevCharEscape;
-    private boolean inString;
-    private char recordStart;
-    private char recordEnd;
-    private int recordNumber = 0;
-
-    public int getRecordNumber() {
-        return recordNumber;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        super.configure(configuration);
-        String recStartString = configuration.get(ExternalDataConstants.KEY_RECORD_START);
-        String recEndString = configuration.get(ExternalDataConstants.KEY_RECORD_END);
-        if (recStartString != null) {
-            if (recStartString.length() != 1) {
-                throw new AsterixException(
-                        ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
-                                ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
-            }
-            recordStart = recStartString.charAt(0);
-        } else {
-            recordStart = ExternalDataConstants.DEFAULT_RECORD_START;
-        }
-        if (recEndString != null) {
-            if (recEndString.length() != 1) {
-                throw new AsterixException(
-                        ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
-                                ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
-            }
-            recordEnd = recEndString.charAt(0);
-        } else {
-            recordEnd = ExternalDataConstants.DEFAULT_RECORD_END;
-        }
-    }
-
-    @Override
-    public boolean hasNext() throws Exception {
-        if (done) {
-            return false;
-        }
-        record.reset();
-        boolean hasStarted = false;
-        boolean hasFinished = false;
-        prevCharEscape = false;
-        inString = false;
-        depth = 0;
-        do {
-            int startPosn = bufferPosn; //starting from where we left off the last time
-            if (bufferPosn >= bufferLength) {
-                startPosn = bufferPosn = 0;
-                bufferLength = reader.read(inputBuffer);
-                if (bufferLength <= 0) {
-                    close();
-                    return false; // EOF
-                }
-            }
-            if (!hasStarted) {
-                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
-                    if (inputBuffer[bufferPosn] == recordStart) {
-                        startPosn = bufferPosn;
-                        hasStarted = true;
-                        depth = 1;
-                        ++bufferPosn; // at next invocation proceed from following byte
-                        break;
-                    } else if (inputBuffer[bufferPosn] != ExternalDataConstants.SPACE
-                            && inputBuffer[bufferPosn] != ExternalDataConstants.TAB
-                            && inputBuffer[bufferPosn] != ExternalDataConstants.LF
-                            && inputBuffer[bufferPosn] != ExternalDataConstants.CR) {
-                        // corrupted file. clear the buffer and stop reading
-                        reader.skipError();
-                        bufferPosn = bufferLength = 0;
-                        throw new IOException("Malformed input stream");
-                    }
-                }
-            }
-            if (hasStarted) {
-                for (; bufferPosn < bufferLength; ++bufferPosn) { //search for record begin
-                    if (inString) {
-                        // we are in a string, we only care about the string end
-                        if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
-                            inString = false;
-                        }
-                        if (prevCharEscape) {
-                            prevCharEscape = false;
-                        } else {
-                            prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
-                        }
-                    } else {
-                        if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE) {
-                            inString = true;
-                        } else if (inputBuffer[bufferPosn] == recordStart) {
-                            depth += 1;
-                        } else if (inputBuffer[bufferPosn] == recordEnd) {
-                            depth -= 1;
-                            if (depth == 0) {
-                                hasFinished = true;
-                                ++bufferPosn; // at next invocation proceed from following byte
-                                break;
-                            }
-                        }
-                    }
-                }
-            }
-
-            int appendLength = bufferPosn - startPosn;
-            if (appendLength > 0) {
-                record.append(inputBuffer, startPosn, appendLength);
-            }
-        } while (!hasFinished);
-        record.endRecord();
-        recordNumber++;
-        return true;
-    }
-
-    @Override
-    public boolean stop() {
-        try {
-            reader.stop();
-        } catch (Exception e) {
-            e.printStackTrace();
-            return false;
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
deleted file mode 100644
index c294ccb..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.RecordId;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.log4j.Logger;
-
-public class SequenceLookupReader extends AbstractCharRecordLookupReader {
-
-    public SequenceLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
-        super(snapshotAccessor, fs, conf);
-    }
-
-    private static final Logger LOGGER = Logger.getLogger(SequenceLookupReader.class.getName());
-    private Reader reader;
-    private Writable key;
-
-    @Override
-    protected void readRecord(RecordId rid) throws IOException {
-        reader.seek(rid.getOffset());
-        reader.next(key, value);
-    }
-
-    @Override
-    protected void closeFile() {
-        if (reader == null) {
-            return;
-        }
-        try {
-            reader.close();
-        } catch (Exception e) {
-            LOGGER.warn("Error closing HDFS file ", e);
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    protected void openFile() throws IllegalArgumentException, IOException {
-        reader = new SequenceFile.Reader(fs, new Path(file.getFileName()), conf);
-        key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-        value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
deleted file mode 100644
index b276bfa..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.RecordId;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-public class TextLookupReader extends AbstractCharRecordLookupReader {
-
-    public TextLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs, Configuration conf) {
-        super(snapshotAccessor, fs, conf);
-    }
-
-    private static final Logger LOGGER = Logger.getLogger(TextLookupReader.class.getName());
-    private HDFSTextLineReader reader;
-
-    @Override
-    protected void readRecord(RecordId rid) throws IOException {
-        reader.seek(rid.getOffset());
-        reader.readLine(value);
-    }
-
-    @Override
-    protected void closeFile() {
-        if (reader == null) {
-            return;
-        }
-        try {
-            reader.close();
-        } catch (Exception e) {
-            LOGGER.warn("Error closing HDFS file ", e);
-        }
-    }
-
-    @Override
-    protected void openFile() throws IllegalArgumentException, IOException {
-        if (reader == null) {
-            reader = new HDFSTextLineReader();
-        }
-        reader.resetReader(fs.open(new Path(file.getFileName())));;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
deleted file mode 100644
index 34d8122..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-import twitter4j.Query;
-import twitter4j.QueryResult;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-
-public class TwitterPullRecordReader implements IRecordReader<Status> {
-
-    private String keywords;
-    private Query query;
-    private Twitter twitter;
-    private int requestInterval = 5; // seconds
-    private QueryResult result;
-    private int nextTweetIndex = 0;
-    private long lastTweetIdReceived = 0;
-    private GenericRecord<Status> record;
-
-    public TwitterPullRecordReader() {
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        twitter = TwitterUtil.getTwitterService(configuration);
-        keywords = configuration.get(SearchAPIConstants.QUERY);
-        requestInterval = Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL));
-        query = new Query(keywords);
-        query.setCount(100);
-        record = new GenericRecord<Status>();
-    }
-
-    @Override
-    public boolean hasNext() throws Exception {
-        return true;
-    }
-
-    @Override
-    public IRawRecord<Status> next() throws IOException, InterruptedException {
-        if (result == null || nextTweetIndex >= result.getTweets().size()) {
-            Thread.sleep(1000 * requestInterval);
-            query.setSinceId(lastTweetIdReceived);
-            try {
-                result = twitter.search(query);
-            } catch (TwitterException e) {
-                throw new HyracksDataException(e);
-            }
-            nextTweetIndex = 0;
-        }
-        if (result != null && !result.getTweets().isEmpty()) {
-            List<Status> tw = result.getTweets();
-            Status tweet = tw.get(nextTweetIndex++);
-            if (lastTweetIdReceived < tweet.getId()) {
-                lastTweetIdReceived = tweet.getId();
-            }
-            record.set(tweet);
-            return record;
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Class<Status> getRecordClass() throws IOException {
-        return Status.class;
-    }
-
-    @Override
-    public boolean stop() {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
deleted file mode 100644
index 3ce6a81..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.asterix.external.util.TwitterUtil;
-
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterStream;
-
-public class TwitterPushRecordReader implements IRecordReader<Status> {
-    private LinkedBlockingQueue<Status> inputQ;
-    private TwitterStream twitterStream;
-    private GenericRecord<Status> record;
-    private boolean closed = false;
-
-    @Override
-    public void close() throws IOException {
-        if (!closed) {
-            twitterStream.clearListeners();
-            twitterStream.cleanUp();
-            twitterStream = null;
-            closed = true;
-        }
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        record = new GenericRecord<Status>();
-        inputQ = new LinkedBlockingQueue<Status>();
-        twitterStream = TwitterUtil.getTwitterStream(configuration);
-        twitterStream.addListener(new TweetListener(inputQ));
-        FilterQuery query = TwitterUtil.getFilterQuery(configuration);
-        if (query != null) {
-            twitterStream.filter(query);
-        } else {
-            twitterStream.sample();
-        }
-    }
-
-    @Override
-    public boolean hasNext() throws Exception {
-        return !closed;
-    }
-
-    @Override
-    public IRawRecord<Status> next() throws IOException, InterruptedException {
-        Status tweet = inputQ.poll();
-        if (tweet == null) {
-            return null;
-        }
-        record.set(tweet);
-        return record;
-    }
-
-    @Override
-    public Class<? extends Status> getRecordClass() throws IOException {
-        return Status.class;
-    }
-
-    @Override
-    public boolean stop() {
-        try {
-            close();
-        } catch (Exception e) {
-            return false;
-        }
-        return true;
-    }
-
-    private class TweetListener implements StatusListener {
-
-        private LinkedBlockingQueue<Status> inputQ;
-
-        public TweetListener(LinkedBlockingQueue<Status> inputQ) {
-            this.inputQ = inputQ;
-        }
-
-        @Override
-        public void onStatus(Status tweet) {
-            inputQ.add(tweet);
-        }
-
-        @Override
-        public void onException(Exception arg0) {
-
-        }
-
-        @Override
-        public void onDeletionNotice(StatusDeletionNotice arg0) {
-        }
-
-        @Override
-        public void onScrubGeo(long arg0, long arg1) {
-        }
-
-        @Override
-        public void onStallWarning(StallWarning arg0) {
-        }
-
-        @Override
-        public void onTrackLimitationNotice(int arg0) {
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
new file mode 100644
index 0000000..895af1b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.couchbase;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
+
+import com.couchbase.client.core.CouchbaseCore;
+import com.couchbase.client.core.dcp.BucketStreamAggregator;
+import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
+import com.couchbase.client.core.dcp.BucketStreamState;
+import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
+import com.couchbase.client.core.env.DefaultCoreEnvironment;
+import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.message.cluster.CloseBucketRequest;
+import com.couchbase.client.core.message.cluster.OpenBucketRequest;
+import com.couchbase.client.core.message.cluster.SeedNodesRequest;
+import com.couchbase.client.core.message.dcp.DCPRequest;
+import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.RemoveMessage;
+import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+
+import rx.functions.Action1;
+
+public class CouchbaseReader implements IRecordReader<RecordWithMetadata<char[]>> {
+
+    private static final MutationMessage POISON_PILL = new MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
+            null);
+    private final String feedName;
+    private final short[] vbuckets;
+    private final String bucket;
+    private final String password;
+    private final String[] couchbaseNodes;
+    private AbstractFeedDataFlowController controller;
+    private Builder builder;
+    private BucketStreamAggregator bucketStreamAggregator;
+    private CouchbaseCore core;
+    private DefaultCoreEnvironment env;
+    private Thread pushThread;
+    private ArrayBlockingQueue<MutationMessage> messages;
+    private GenericRecord<RecordWithMetadata<char[]>> record;
+    private RecordWithMetadata<char[]> recordWithMetadata;
+    private boolean done = false;
+    private CharArrayRecord value;
+    private CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+    private ByteBuffer bytes = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    private CharBuffer chars = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    // metaTypes = {key(string), bucket(string), vbucket(int32), seq(long), cas(long), creationTime(long),expiration(int32),flags(int32),revSeqNumber(long),lockTime(int32)}
+    private static final IAType[] metaTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
+            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT32,
+            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT32 };
+    private static final Logger LOGGER = Logger.getLogger(CouchbaseReader.class);
+
+    public CouchbaseReader(String feedName, String bucket, String password, String[] couchbaseNodes, short[] vbuckets,
+            int queueSize) throws HyracksDataException {
+        this.feedName = feedName;
+        this.bucket = bucket;
+        this.password = password;
+        this.couchbaseNodes = couchbaseNodes;
+        this.vbuckets = vbuckets;
+        this.recordWithMetadata = new RecordWithMetadata<char[]>(metaTypes, char[].class);
+        this.messages = new ArrayBlockingQueue<MutationMessage>(queueSize);
+        this.value = new CharArrayRecord();
+        recordWithMetadata.setRecord(value);
+        this.record = new GenericRecord<RecordWithMetadata<char[]>>(recordWithMetadata);
+    }
+
+    @Override
+    public void close() {
+        if (!done) {
+            done = true;
+        }
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.builder = DefaultCoreEnvironment.builder().dcpEnabled(CouchbaseReaderFactory.DCP_ENABLED)
+                .autoreleaseAfter(CouchbaseReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS);
+        this.env = builder.build();
+        this.core = new CouchbaseCore(env);
+        this.bucketStreamAggregator = new BucketStreamAggregator(feedName, core, bucket);
+        connect();
+    }
+
+    private void connect() {
+        core.send(new SeedNodesRequest(couchbaseNodes))
+                .timeout(CouchbaseReaderFactory.TIMEOUT, CouchbaseReaderFactory.TIME_UNIT).toBlocking().single();
+        core.send(new OpenBucketRequest(bucket, password))
+                .timeout(CouchbaseReaderFactory.TIMEOUT, CouchbaseReaderFactory.TIME_UNIT).toBlocking().single();
+        this.pushThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                CouchbaseReader.this.run(bucketStreamAggregator);
+            }
+        }, feedName);
+        pushThread.start();
+    }
+
+    private void run(BucketStreamAggregator bucketStreamAggregator) {
+        BucketStreamAggregatorState state = new BucketStreamAggregatorState();
+        for (int i = 0; i < vbuckets.length; i++) {
+            state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 0xffffffff));
+        }
+        state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() {
+            @Override
+            public void call(BucketStreamStateUpdatedEvent event) {
+                if (event.partialUpdate()) {
+                } else {
+                }
+            }
+        });
+        try {
+            bucketStreamAggregator.feed(state).toBlocking().forEach(new Action1<DCPRequest>() {
+                @Override
+                public void call(final DCPRequest dcpRequest) {
+                    try {
+                        if (dcpRequest instanceof SnapshotMarkerMessage) {
+                            SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
+                            final BucketStreamState oldState = state.get(message.partition());
+                            state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(),
+                                    message.endSequenceNumber(), oldState.endSequenceNumber(),
+                                    message.endSequenceNumber(), oldState.snapshotEndSequenceNumber()));
+                        } else if (dcpRequest instanceof MutationMessage) {
+
+                            messages.put((MutationMessage) dcpRequest);
+                        } else if (dcpRequest instanceof RemoveMessage) {
+                            RemoveMessage message = (RemoveMessage) dcpRequest;
+                            LOGGER.info(message.key() + " was deleted.");
+                        }
+                    } catch (Throwable th) {
+                        LOGGER.error(th);
+                    }
+                }
+            });
+        } catch (Throwable th) {
+            if (th.getCause() instanceof InterruptedException) {
+                LOGGER.warn("dcp thread was interrupted", th);
+                synchronized (this) {
+                    CouchbaseReader.this.close();
+                    notifyAll();
+                }
+            }
+            throw th;
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return !done;
+    }
+
+    @Override
+    public IRawRecord<RecordWithMetadata<char[]>> next() throws IOException, InterruptedException {
+        if (messages.isEmpty()) {
+            controller.flush();
+        }
+        MutationMessage message = messages.take();
+        if (message == POISON_PILL) {
+            return null;
+        }
+        String key = message.key();
+        int vbucket = message.partition();
+        long seq = message.bySequenceNumber();
+        String bucket = message.bucket();
+        long cas = message.cas();
+        long creationTime = message.creationTime();
+        int expiration = message.expiration();
+        int flags = message.flags();
+        long revSeqNumber = message.revisionSequenceNumber();
+        int lockTime = message.lockTime();
+        recordWithMetadata.reset();
+        recordWithMetadata.setMetadata(0, key);
+        recordWithMetadata.setMetadata(1, bucket);
+        recordWithMetadata.setMetadata(2, vbucket);
+        recordWithMetadata.setMetadata(3, seq);
+        recordWithMetadata.setMetadata(4, cas);
+        recordWithMetadata.setMetadata(5, creationTime);
+        recordWithMetadata.setMetadata(6, expiration);
+        recordWithMetadata.setMetadata(7, flags);
+        recordWithMetadata.setMetadata(8, revSeqNumber);
+        recordWithMetadata.setMetadata(9, lockTime);
+        CouchbaseReader.set(message.content(), decoder, bytes, chars, value);
+        return record;
+    }
+
+    @Override
+    public boolean stop() {
+        done = true;
+        core.send(new CloseBucketRequest(bucket)).toBlocking();
+        try {
+            messages.put(CouchbaseReader.POISON_PILL);
+        } catch (InterruptedException e) {
+            LOGGER.warn(e);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void setController(IDataFlowController controller) {
+        this.controller = (AbstractFeedDataFlowController) controller;
+    }
+
+    public static void set(ByteBuf content, CharsetDecoder decoder, ByteBuffer bytes, CharBuffer chars,
+            CharArrayRecord record) {
+        int position = content.readerIndex();
+        int limit = content.writerIndex();
+        int contentSize = content.capacity();
+        while (position < limit) {
+            bytes.clear();
+            chars.clear();
+            if (contentSize - position < bytes.capacity()) {
+                bytes.limit(contentSize - position);
+            }
+            content.getBytes(position, bytes);
+            position += bytes.position();
+            bytes.flip();
+            decoder.decode(bytes, chars, false);
+            chars.flip();
+            record.append(chars);
+        }
+        record.endRecord();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
new file mode 100644
index 0000000..b9b6f65
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.couchbase;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import com.couchbase.client.core.CouchbaseCore;
+import com.couchbase.client.core.config.CouchbaseBucketConfig;
+import com.couchbase.client.core.env.DefaultCoreEnvironment;
+import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.message.cluster.CloseBucketRequest;
+import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
+import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
+import com.couchbase.client.core.message.cluster.OpenBucketRequest;
+import com.couchbase.client.core.message.cluster.SeedNodesRequest;
+
+import rx.functions.Func1;
+
+public class CouchbaseReaderFactory implements IRecordReaderFactory<RecordWithMetadata<char[]>> {
+
+    private static final long serialVersionUID = 1L;
+    // Constant fields
+    public static final boolean DCP_ENABLED = true;
+    public static final long AUTO_RELEASE_AFTER_MILLISECONDS = 5000L;
+    public static final int TIMEOUT = 5;
+    public static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+    // Dynamic fields
+    private Map<String, String> configuration;
+    private String bucket;
+    private String password = "";
+    private String[] couchbaseNodes;
+    private int numOfVBuckets;
+    private int[] schedule;
+    private String feedName;
+    // Transient fields
+    private transient CouchbaseCore core;
+    private transient Builder builder;
+    private transient DefaultCoreEnvironment env;
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return AsterixClusterProperties.INSTANCE.getClusterLocations();
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        // validate first
+        if (!configuration.containsKey(ExternalDataConstants.KEY_BUCKET)) {
+            throw new AsterixException("Unspecified bucket");
+        }
+        if (!configuration.containsKey(ExternalDataConstants.KEY_NODES)) {
+            throw new AsterixException("Unspecified Couchbase nodes");
+        }
+        if (configuration.containsKey(ExternalDataConstants.KEY_PASSWORD)) {
+            password = configuration.get(ExternalDataConstants.KEY_PASSWORD);
+        }
+        this.configuration = configuration;
+        bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
+        couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(",");
+        feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
+        builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED)
+                .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
+        env = builder.build();
+        core = new CouchbaseCore(env);
+        getNumberOfVbuckets();
+        schedule();
+    }
+
+    /*
+     * We distribute the work of streaming vbuckets between all the partitions in a round robin fashion.
+     */
+    private void schedule() {
+        schedule = new int[numOfVBuckets];
+        String[] locations = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
+        for (int i = 0; i < numOfVBuckets; i++) {
+            schedule[i] = i % locations.length;
+        }
+    }
+
+    private void getNumberOfVbuckets() {
+        core.send(new SeedNodesRequest(couchbaseNodes)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+        core.send(new OpenBucketRequest(bucket, password)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+        numOfVBuckets = core.<GetClusterConfigResponse> send(new GetClusterConfigRequest())
+                .map(new Func1<GetClusterConfigResponse, Integer>() {
+                    @Override
+                    public Integer call(GetClusterConfigResponse response) {
+                        CouchbaseBucketConfig config = (CouchbaseBucketConfig) response.config().bucketConfig(bucket);
+                        return config.numberOfPartitions();
+                    }
+                }).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+        core.send(new CloseBucketRequest(bucket)).toBlocking();
+    }
+
+    @Override
+    public IRecordReader<? extends RecordWithMetadata<char[]>> createRecordReader(IHyracksTaskContext ctx,
+            int partition) throws Exception {
+        String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+        ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>();
+        for (int i = 0; i < schedule.length; i++) {
+            if (schedule[i] == partition) {
+                listOfAssignedVBuckets.add((short) i);
+            }
+        }
+        short[] vbuckets = new short[listOfAssignedVBuckets.size()];
+        for (int i = 0; i < vbuckets.length; i++) {
+            vbuckets[i] = listOfAssignedVBuckets.get(i);
+        }
+        CouchbaseReader reader = new CouchbaseReader(feedName + ":" + nodeName + ":" + partition, bucket, password,
+                couchbaseNodes, vbuckets, ExternalDataUtils.getQueueSize(configuration));
+        reader.configure(configuration);
+        return reader;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Class<? extends RecordWithMetadata<char[]>> getRecordClass() {
+        RecordWithMetadata<char[]> record = new RecordWithMetadata<char[]>(char[].class);
+        return (Class<? extends RecordWithMetadata<char[]>>) record.getClass();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
deleted file mode 100644
index e9fad25..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.factory;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.ILookupReaderFactory;
-import org.apache.asterix.external.api.ILookupRecordReader;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.input.record.reader.RCLookupReader;
-import org.apache.asterix.external.input.record.reader.SequenceLookupReader;
-import org.apache.asterix.external.input.record.reader.TextLookupReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.hdfs.dataflow.ConfFactory;
-
-public class HDFSLookupReaderFactory<T> implements ILookupReaderFactory<T> {
-
-    protected static final long serialVersionUID = 1L;
-    protected transient AlgebricksPartitionConstraint clusterLocations;
-    protected ConfFactory confFactory;
-    protected Map<String, String> configuration;
-
-    public HDFSLookupReaderFactory() {
-    }
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.RECORDS;
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        clusterLocations = HDFSUtils.getPartitionConstraints(clusterLocations);
-        return clusterLocations;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        this.configuration = configuration;
-        JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
-        confFactory = new ConfFactory(conf);
-
-    }
-
-    @Override
-    public boolean isIndexible() {
-        return false;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public ILookupRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition,
-            ExternalFileIndexAccessor snapshotAccessor) throws Exception {
-        String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
-        JobConf conf = confFactory.getConf();
-        FileSystem fs = FileSystem.get(conf);
-        switch (inputFormatParameter) {
-            case ExternalDataConstants.INPUT_FORMAT_TEXT:
-                return (ILookupRecordReader<? extends T>) new TextLookupReader(snapshotAccessor, fs, conf);
-            case ExternalDataConstants.INPUT_FORMAT_SEQUENCE:
-                return (ILookupRecordReader<? extends T>) new SequenceLookupReader(snapshotAccessor, fs, conf);
-            case ExternalDataConstants.INPUT_FORMAT_RC:
-                return (ILookupRecordReader<? extends T>) new RCLookupReader(snapshotAccessor, fs, conf);
-            default:
-                throw new AsterixException("Unrecognised input format: " + inputFormatParameter);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
deleted file mode 100644
index 05d419d..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.factory;
-
-import java.util.Map;
-
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.LineRecordReader;
-import org.apache.asterix.external.input.record.reader.QuotedLineRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class LineRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
-        String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
-        LineRecordReader recordReader;
-        if (quoteString != null) {
-            recordReader = new QuotedLineRecordReader();
-        } else {
-            recordReader = new LineRecordReader();
-        }
-        return configureReader(recordReader, ctx, partition);
-    }
-
-    @Override
-    public Class<? extends char[]> getRecordClass() {
-        return char[].class;
-    }
-
-    @Override
-    protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
deleted file mode 100644
index a672f2f..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.factory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.RSSRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-import com.sun.syndication.feed.synd.SyndEntryImpl;
-
-public class RSSRecordReaderFactory implements IRecordReaderFactory<SyndEntryImpl> {
-
-    private static final long serialVersionUID = 1L;
-    private Map<String, String> configuration;
-    private List<String> urls = new ArrayList<String>();
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.RECORDS;
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(urls.size());
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        this.configuration = configuration;
-        String url = configuration.get(ExternalDataConstants.KEY_RSS_URL);
-        if (url == null) {
-            throw new IllegalArgumentException("no RSS URL provided");
-        }
-        initializeURLs(url);
-    }
-
-    private void initializeURLs(String url) {
-        urls.clear();
-        String[] rssURLs = url.split(",");
-        for (String rssURL : rssURLs) {
-            urls.add(rssURL);
-        }
-    }
-
-    @Override
-    public boolean isIndexible() {
-        return false;
-    }
-
-    @Override
-    public IRecordReader<? extends SyndEntryImpl> createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws Exception {
-        RSSRecordReader reader = new RSSRecordReader(urls.get(partition));
-        reader.configure(configuration);
-        return reader;
-    }
-
-    @Override
-    public Class<? extends SyndEntryImpl> getRecordClass() {
-        return SyndEntryImpl.class;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
deleted file mode 100644
index 91b439c..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.factory;
-
-import java.util.Map;
-
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.SemiStructuredRecordReader;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
-        SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
-        return configureReader(recordReader, ctx, partition);
-    }
-
-    @Override
-    public Class<? extends char[]> getRecordClass() {
-        return char[].class;
-    }
-
-    @Override
-    protected void configureStreamReaderFactory(Map<String, String> configuration) throws Exception {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
deleted file mode 100644
index 6840c11..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader.factory;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.TwitterPullRecordReader;
-import org.apache.asterix.external.input.record.reader.TwitterPushRecordReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.TwitterUtil;
-import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
-import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-import twitter4j.Status;
-
-public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName());
-
-    private static final String DEFAULT_INTERVAL = "10"; // 10 seconds
-    private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
-
-    private Map<String, String> configuration;
-    private boolean pull;
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.RECORDS;
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY);
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        this.configuration = configuration;
-        TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
-        if (!validateConfiguration(configuration)) {
-            StringBuilder builder = new StringBuilder();
-            builder.append("One or more parameters are missing from adapter configuration\n");
-            builder.append(AuthenticationConstants.OAUTH_CONSUMER_KEY + "\n");
-            builder.append(AuthenticationConstants.OAUTH_CONSUMER_SECRET + "\n");
-            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n");
-            builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n");
-            throw new Exception(builder.toString());
-        }
-        if (ExternalDataUtils.isPull(configuration)) {
-            pull = true;
-            if (configuration.get(SearchAPIConstants.QUERY) == null) {
-                throw new AsterixException(
-                        "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
-            }
-            String interval = configuration.get(SearchAPIConstants.INTERVAL);
-            if (interval != null) {
-                try {
-                    Integer.parseInt(interval);
-                } catch (NumberFormatException nfe) {
-                    throw new IllegalArgumentException(
-                            "parameter " + SearchAPIConstants.INTERVAL + " is defined incorrectly, expecting a number");
-                }
-            } else {
-                configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
-                            + DEFAULT_INTERVAL + ")");
-                }
-            }
-        } else if (ExternalDataUtils.isPush(configuration)) {
-            pull = false;
-        } else {
-            throw new AsterixException("One of boolean parameters " + ExternalDataConstants.KEY_PULL + " and "
-                    + ExternalDataConstants.KEY_PUSH + " must be specified as part of adaptor configuration");
-        }
-    }
-
-    @Override
-    public boolean isIndexible() {
-        return false;
-    }
-
-    @Override
-    public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception {
-        IRecordReader<Status> reader;
-        if (pull) {
-            reader = new TwitterPullRecordReader();
-        } else {
-            reader = new TwitterPushRecordReader();
-        }
-        reader.configure(configuration);
-        return reader;
-    }
-
-    @Override
-    public Class<? extends Status> getRecordClass() {
-        return Status.class;
-    }
-
-    private boolean validateConfiguration(Map<String, String> configuration) {
-        String consumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
-        String consumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
-        String accessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
-        String tokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
-        if (consumerKey == null || consumerSecret == null || accessToken == null || tokenSecret == null) {
-            return false;
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
new file mode 100644
index 0000000..0627660
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.hdfs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.RecordId;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+public abstract class AbstractCharRecordLookupReader extends AbstractHDFSLookupRecordReader<char[]> {
+    public AbstractCharRecordLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs,
+            Configuration conf) {
+        super(snapshotAccessor, fs, conf);
+    }
+
+    protected CharArrayRecord record = new CharArrayRecord();
+    protected Text value = new Text();
+    protected CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+    protected ByteBuffer reusableByteBuffer = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+    protected CharBuffer reusableCharBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+
+    @Override
+    public Class<?> getRecordClass() throws IOException {
+        return char[].class;
+    }
+
+    @Override
+    protected IRawRecord<char[]> lookup(RecordId rid) throws IOException {
+        record.reset();
+        readRecord(rid);
+        writeRecord();
+        return record;
+    }
+
+    protected abstract void readRecord(RecordId rid) throws IOException;
+
+    private void writeRecord() {
+        reusableByteBuffer.clear();
+        if (reusableByteBuffer.remaining() < value.getLength()) {
+            reusableByteBuffer = ByteBuffer
+                    .allocateDirect(value.getLength() + ExternalDataConstants.DEFAULT_BUFFER_INCREMENT);
+        }
+        reusableByteBuffer.put(value.getBytes(), 0, value.getLength());
+        reusableByteBuffer.flip();
+        while (reusableByteBuffer.hasRemaining()) {
+            reusableCharBuffer.clear();
+            decoder.decode(reusableByteBuffer, reusableCharBuffer, false);
+            reusableCharBuffer.flip();
+            record.append(reusableCharBuffer);
+        }
+        record.endRecord();
+    }
+}