You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/30 00:52:18 UTC

[5/5] incubator-asterixdb git commit: Add flush() to IFrameWriter

Add flush() to IFrameWriter

This method is expected to be used with feeds to push
frames all the way to storage when needed. As of now, it is
needed in two cases:
1. No activities in ingestion node and need to push content
   so it can be stored.
2. When the ingestion node needs to move the checkpoint ahead
   if the at least once semantics are used.

Two feeds make use of this function. The filesystem feed and
couchbase feed which was introduced as well in this change.

Change-Id: Id862ce9e9b1360864c6976f2aea2137092f51203
Reviewed-on: https://asterix-gerrit.ics.uci.edu/585
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/ee387c12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/ee387c12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/ee387c12

Branch: refs/heads/master
Commit: ee387c12bc25b2b5812e639694965105eb47dfa2
Parents: e2439b4
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Sat Jan 30 02:33:44 2016 +0300
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Fri Jan 29 15:47:27 2016 -0800

----------------------------------------------------------------------
 .../operators/physical/CommitRuntime.java       |   4 +
 .../asterix/feed/FeedMessageReceiver.java       |   7 +-
 ...ConstantTupleSourceOperatorNodePushable.java |   2 +-
 asterix-external-data/pom.xml                   |  10 +
 .../adapter/factory/LookupAdapterFactory.java   |   2 +-
 .../asterix/external/api/IDataParser.java       |   8 +
 .../api/IExternalDataSourceFactory.java         |   4 +-
 .../apache/asterix/external/api/IRawRecord.java |  11 +-
 .../asterix/external/api/IRecordReader.java     |  15 +-
 .../AbstractFeedDataFlowController.java         |   4 +
 .../dataflow/FeedRecordDataFlowController.java  |   2 +
 .../external/dataflow/FeedTupleForwarder.java   |   4 +
 .../external/dataset/adapter/LookupAdapter.java |   7 +-
 .../CollectTransformFeedFrameWriter.java        |   6 +
 .../dataflow/DistributeFeedFrameWriter.java     |   5 +
 .../external/feed/dataflow/FeedFrameCache.java  |   5 +
 .../feed/dataflow/FeedFrameCollector.java       |   9 +
 .../feed/dataflow/FeedFrameHandlers.java        |   4 +
 .../feed/dataflow/FeedRuntimeInputHandler.java  |  10 +-
 .../feed/dataflow/FrameDistributor.java         |  12 +-
 .../external/feed/message/MessageReceiver.java  |  10 +-
 .../external/feed/watch/MonitoredBuffer.java    |   5 +
 .../feed/watch/StorageSideMonitoredBuffer.java  |   4 +-
 .../external/indexing/FileOffsetIndexer.java    |   2 +-
 .../indexing/RecordColumnarIndexer.java         |   2 +-
 .../external/input/HDFSDataSourceFactory.java   |   4 +-
 .../external/input/record/CharArrayRecord.java  |  18 +-
 .../external/input/record/GenericRecord.java    |   5 -
 .../input/record/RecordWithMetadata.java        | 138 ++++++++++
 .../reader/AbstractCharRecordLookupReader.java  |  78 ------
 .../reader/AbstractHDFSLookupRecordReader.java  | 113 --------
 .../reader/AbstractStreamRecordReader.java      |  90 -------
 .../AbstractStreamRecordReaderFactory.java      |  99 -------
 .../input/record/reader/HDFSRecordReader.java   | 194 --------------
 .../input/record/reader/HDFSTextLineReader.java | 234 -----------------
 .../input/record/reader/LineRecordReader.java   | 106 --------
 .../reader/LookupReaderFactoryProvider.java     |  44 ----
 .../record/reader/QuotedLineRecordReader.java   | 119 ---------
 .../input/record/reader/RCLookupReader.java     |  92 -------
 .../input/record/reader/RSSRecordReader.java    | 177 -------------
 .../reader/SemiStructuredRecordReader.java      | 157 -----------
 .../record/reader/SequenceLookupReader.java     |  71 -----
 .../input/record/reader/TextLookupReader.java   |  64 -----
 .../record/reader/TwitterPullRecordReader.java  | 106 --------
 .../record/reader/TwitterPushRecordReader.java  | 132 ----------
 .../reader/couchbase/CouchbaseReader.java       | 259 +++++++++++++++++++
 .../couchbase/CouchbaseReaderFactory.java       | 153 +++++++++++
 .../reader/factory/HDFSLookupReaderFactory.java |  90 -------
 .../reader/factory/LineRecordReaderFactory.java |  55 ----
 .../reader/factory/RSSRecordReaderFactory.java  |  87 -------
 .../SemiStructuredRecordReaderFactory.java      |  46 ----
 .../factory/TwitterRecordReaderFactory.java     | 136 ----------
 .../hdfs/AbstractCharRecordLookupReader.java    |  79 ++++++
 .../hdfs/AbstractHDFSLookupRecordReader.java    | 113 ++++++++
 .../reader/hdfs/HDFSLookupReaderFactory.java    |  87 +++++++
 .../record/reader/hdfs/HDFSRecordReader.java    | 185 +++++++++++++
 .../record/reader/hdfs/HDFSTextLineReader.java  | 234 +++++++++++++++++
 .../record/reader/hdfs/RCLookupReader.java      |  92 +++++++
 .../reader/hdfs/SequenceLookupReader.java       |  71 +++++
 .../record/reader/hdfs/TextLookupReader.java    |  64 +++++
 .../record/reader/rss/RSSRecordReader.java      | 172 ++++++++++++
 .../reader/rss/RSSRecordReaderFactory.java      |  86 ++++++
 .../stream/AbstractStreamRecordReader.java      |  92 +++++++
 .../AbstractStreamRecordReaderFactory.java      |  99 +++++++
 .../record/reader/stream/LineRecordReader.java  | 106 ++++++++
 .../reader/stream/LineRecordReaderFactory.java  |  52 ++++
 .../reader/stream/QuotedLineRecordReader.java   | 119 +++++++++
 .../stream/SemiStructuredRecordReader.java      | 157 +++++++++++
 .../SemiStructuredRecordReaderFactory.java      |  44 ++++
 .../reader/twitter/TwitterPullRecordReader.java | 100 +++++++
 .../reader/twitter/TwitterPushRecordReader.java | 126 +++++++++
 .../twitter/TwitterRecordReaderFactory.java     | 134 ++++++++++
 .../external/input/stream/AInputStream.java     |   5 +
 .../input/stream/AInputStreamReader.java        |   6 +
 .../input/stream/HDFSInputStreamProvider.java   | 118 ---------
 .../stream/LocalFSInputStreamProvider.java      |  62 -----
 .../stream/LocalFileSystemInputStream.java      |   6 +
 .../input/stream/SocketInputStreamProvider.java |  36 ---
 .../TwitterFirehoseInputStreamProvider.java     | 183 -------------
 .../LocalFSInputStreamProviderFactory.java      |   2 +-
 .../SocketInputStreamProviderFactory.java       |   2 +-
 .../provider/HDFSInputStreamProvider.java       | 119 +++++++++
 .../provider/LocalFSInputStreamProvider.java    |  64 +++++
 .../provider/SocketInputStreamProvider.java     |  38 +++
 .../TwitterFirehoseInputStreamProvider.java     | 184 +++++++++++++
 .../ExternalLookupOperatorDescriptor.java       |   7 +-
 .../operators/FeedMetaComputeNodePushable.java  |  14 +-
 .../operators/FeedMetaNodePushable.java         |   5 +
 .../operators/FeedMetaStoreNodePushable.java    |  11 +-
 .../asterix/external/parser/ADMDataParser.java  |  40 +--
 .../external/parser/DelimitedDataParser.java    |  16 +-
 .../parser/RecordWithMetadataParser.java        | 107 ++++++++
 .../RecordWithMetadataParserFactory.java        | 100 +++++++
 .../provider/DatasourceFactoryProvider.java     |  12 +-
 .../provider/LookupReaderFactoryProvider.java   |  44 ++++
 .../provider/ParserFactoryProvider.java         |   9 +-
 .../asterix/external/util/DataflowUtils.java    |   2 +-
 .../external/util/ExternalDataConstants.java    |  21 ++
 .../external/util/ExternalDataUtils.java        |  20 +-
 .../external/util/FileSystemWatcher.java        |  12 +-
 .../apache/asterix/external/util/HDFSUtils.java |   2 +-
 .../metadata/declared/AqlMetadataProvider.java  |   2 +-
 .../metadata/feeds/FeedMetadataUtil.java        |   3 +-
 .../om/util/AsterixClusterProperties.java       |   3 +-
 104 files changed, 3697 insertions(+), 2791 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index 92cd61a..ce80291 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -139,4 +139,8 @@ public class CommitRuntime implements IPushRuntime {
     public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
         this.frameTupleAccessor = new FrameTupleAccessor(recordDescriptor);
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java b/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
index 66eca0c..4ae2e59 100644
--- a/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
+++ b/asterix-app/src/main/java/org/apache/asterix/feed/FeedMessageReceiver.java
@@ -20,7 +20,6 @@ package org.apache.asterix.feed;
 
 import java.util.logging.Level;
 
-import org.json.JSONObject;
 import org.apache.asterix.external.feed.api.IFeedLoadManager;
 import org.apache.asterix.external.feed.api.IFeedTrackingManager;
 import org.apache.asterix.external.feed.api.IFeedMessage.MessageType;
@@ -35,6 +34,8 @@ import org.apache.asterix.external.feed.watch.NodeLoadReport;
 import org.apache.asterix.external.util.FeedConstants;
 import org.apache.asterix.feed.CentralFeedManager.AQLExecutor;
 import org.apache.asterix.hyracks.bootstrap.FeedBootstrap;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.json.JSONObject;
 
 public class FeedMessageReceiver extends MessageReceiver<String> {
 
@@ -88,4 +89,8 @@ public class FeedMessageReceiver extends MessageReceiver<String> {
         }
 
     }
+
+    @Override
+    public void emptyInbox() throws HyracksDataException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 48bb6a9..0f705a9 100644
--- a/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -49,7 +49,7 @@ public class ConstantTupleSourceOperatorNodePushable extends AbstractUnaryOutput
         }
         try {
             writer.open();
-            appender.flush(writer, true);
+            appender.write(writer, true);
         } catch (Throwable th) {
             writer.fail();
             throw new HyracksDataException(th);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 867c96b..7801fd7 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -277,5 +277,15 @@
             <version>1.2.2</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.couchbase.client</groupId>
+            <artifactId>core-io</artifactId>
+            <version>1.2.3</version>
+        </dependency>
+        <dependency>
+            <groupId>io.reactivex</groupId>
+            <artifactId>rxjava</artifactId>
+            <version>1.0.15</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index 866910b..76c8b85 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -29,7 +29,7 @@ import org.apache.asterix.external.dataset.adapter.LookupAdapter;
 import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
 import org.apache.asterix.external.indexing.RecordIdReader;
 import org.apache.asterix.external.indexing.RecordIdReaderFactory;
-import org.apache.asterix.external.input.record.reader.LookupReaderFactoryProvider;
+import org.apache.asterix.external.provider.LookupReaderFactoryProvider;
 import org.apache.asterix.external.provider.ParserFactoryProvider;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.comm.IFrameWriter;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
index a4a5a43..4ad4c4f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
@@ -37,6 +37,7 @@ import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
@@ -124,4 +125,11 @@ public interface IDataParser {
                 break;
         }
     }
+
+    public static <T> void toBytes(T serializable, ArrayBackedValueStorage buffer, ISerializerDeserializer<T> serde)
+            throws HyracksDataException {
+        buffer.reset();
+        DataOutput out = buffer.getDataOutput();
+        serde.serialize(serializable, out);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 580ac99..370ea93 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -59,6 +59,8 @@ public interface IExternalDataSourceFactory extends Serializable {
      * Specify whether the external data source can be indexed
      * @return
      */
-    public boolean isIndexible();
+    public default boolean isIndexible() {
+        return false;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java
index 92b500d..fe15244 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java
@@ -35,11 +35,6 @@ public interface IRawRecord<T> {
     public T get();
 
     /**
-     * @return The class of the record objects.
-     */
-    public Class<?> getRecordClass();
-
-    /**
      * Resets the object to prepare it for another write operation.
      */
     public void reset();
@@ -48,4 +43,10 @@ public interface IRawRecord<T> {
      * @return The size of the valid bytes of the object. If the object can't be serialized, this method returns -1
      */
     int size();
+
+    /**
+     * Sets the new value of the record
+     * @param t
+     */
+    public void set(T t);
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 019fe8f..3cf467e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -52,15 +52,16 @@ public interface IRecordReader<T> extends Closeable {
     public IRawRecord<T> next() throws IOException, InterruptedException;
 
     /**
-     * @return the class of the java objects representing the records. used to check compatibility between readers and
-     *         parsers.
-     * @throws IOException
-     */
-    public Class<? extends T> getRecordClass() throws IOException;
-
-    /**
      * used to stop reader from producing more records.
      * @return true if the connection to the external source has been suspended, false otherwise.
      */
     public boolean stop();
+
+    /**
+     * set a pointer to the controller of the feed. the controller can be used to flush()
+     * parsed records when waiting for more records to be pushed
+     */
+    public default void setController(IDataFlowController controller) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    };
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index aab4bf6..461eaf9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -65,4 +65,8 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl
         tupleForwarder.resume();
         return true;
     }
+
+    public void flush() throws HyracksDataException {
+        tupleForwarder.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index fe4557d..2a4eaf9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -43,6 +43,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
             while (recordReader.hasNext()) {
                 IRawRecord<? extends T> record = recordReader.next();
                 if (record == null) {
+                    flush();
                     Thread.sleep(interval);
                     continue;
                 }
@@ -110,5 +111,6 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
     @Override
     public void setRecordReader(IRecordReader<T> recordReader) {
         this.recordReader = recordReader;
+        recordReader.setController(this);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index d170766..b46a338 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -80,4 +80,8 @@ public class FeedTupleForwarder implements ITupleForwarder {
             FrameUtils.flushFrame(frame.getBuffer(), writer);
         }
     }
+
+    public void flush() throws HyracksDataException {
+        appender.flush(writer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
index ba6f83c..a97182c 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -154,9 +154,14 @@ public final class LookupAdapter<T> implements IFrameWriter {
     @Override
     public void close() throws HyracksDataException {
         try {
-            appender.flush(writer, true);
+            appender.write(writer, true);
         } finally {
             writer.close();
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        appender.flush(writer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
index 18b6ec0..3fc7ac8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
@@ -63,6 +63,7 @@ public class CollectTransformFeedFrameWriter implements IFeedOperatorOutputSideH
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        // always project the first field only. why?
         inputFrameTupleAccessor.reset(buffer);
         int nTuple = inputFrameTupleAccessor.getTupleCount();
         for (int t = 0; t < nTuple; t++) {
@@ -116,4 +117,9 @@ public class CollectTransformFeedFrameWriter implements IFeedOperatorOutputSideH
         this.downstreamWriter = writer;
     }
 
+    @Override
+    public void flush() throws HyracksDataException {
+        tupleAppender.flush(downstreamWriter);
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
index 7367d5a..d314f74 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
@@ -156,4 +156,9 @@ public class DistributeFeedFrameWriter implements IFrameWriter {
     public FrameDistributor.DistributionMode getDistributionMode() {
         return frameDistributor.getDistributionMode();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        frameDistributor.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
index 0a595b7..159bc43 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
@@ -169,4 +169,9 @@ public class FeedFrameCache extends MessageReceiver<ByteBuffer> {
             frameWriter.nextFrame(frame);
         }
     }
+
+    @Override
+    public void emptyInbox() throws HyracksDataException {
+        frameWriter.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
index 0d53524..ef4b87d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
@@ -157,4 +157,13 @@ public class FeedFrameCollector extends MessageReceiver<DataBucket> {
         return connectionId.toString().hashCode();
     }
 
+    @Override
+    public void emptyInbox() throws HyracksDataException {
+        flush();
+    }
+
+    public synchronized void flush() throws HyracksDataException {
+        frameWriter.flush();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
index 6ad00f1..3c45a20 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
@@ -272,6 +272,10 @@ public class FeedFrameHandlers {
                 };
             }
 
+            @Override
+            public void emptyInbox() throws HyracksDataException {
+            }
+
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 3a46b1a..a00e732 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -112,7 +112,6 @@ public class FeedRuntimeInputHandler implements IFrameWriter {
         this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
                 feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
                 nPartitions, fpa);
-        this.mBuffer.start();
         this.throttlingEnabled = false;
     }
 
@@ -414,6 +413,7 @@ public class FeedRuntimeInputHandler implements IFrameWriter {
     @Override
     public void open() throws HyracksDataException {
         coreOperator.open();
+        mBuffer.start();
     }
 
     @Override
@@ -465,4 +465,12 @@ public class FeedRuntimeInputHandler implements IFrameWriter {
     public void setBufferingEnabled(boolean bufferingEnabled) {
         this.bufferingEnabled = bufferingEnabled;
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // Only flush when in process mode.
+        if (mode == Mode.PROCESS) {
+            coreOperator.flush();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
index 543efb2..85308df 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
@@ -25,8 +25,8 @@ import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.external.feed.api.IFeedMemoryManager;
 import org.apache.asterix.external.feed.api.IFeedMemoryComponent.Type;
+import org.apache.asterix.external.feed.api.IFeedMemoryManager;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -358,4 +358,14 @@ public class FrameDistributor {
         return fta;
     }
 
+    public void flush() throws HyracksDataException {
+        switch (distributionMode) {
+            case SINGLE:
+                FeedFrameCollector collector = registeredCollectors.values().iterator().next();
+                collector.flush();
+            default:
+                break;
+        }
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
index abeb994..f69c552 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
@@ -25,6 +25,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.external.feed.api.IMessageReceiver;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class MessageReceiver<T> implements IMessageReceiver<T> {
 
@@ -74,12 +75,17 @@ public abstract class MessageReceiver<T> implements IMessageReceiver<T> {
             this.inbox = messageReceiver.inbox;
             this.messageReceiver = messageReceiver;
         }
+        // TODO: this should handle exceptions better
 
         @Override
         public void run() {
             while (true) {
                 try {
-                    T message = inbox.take();
+                    T message = inbox.poll();
+                    if (message == null) {
+                        messageReceiver.emptyInbox();
+                        message = inbox.take();
+                    }
                     messageReceiver.processMessage(message);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
@@ -108,4 +114,6 @@ public abstract class MessageReceiver<T> implements IMessageReceiver<T> {
         }
     }
 
+    public abstract void emptyInbox() throws HyracksDataException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
index db38edf..b93410a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
@@ -393,4 +393,9 @@ public abstract class MonitoredBuffer extends MessageReceiver<DataBucket> {
         return storageTimeTrackingRateTask;
     }
 
+    @Override
+    public void emptyInbox() throws HyracksDataException {
+        inputHandler.flush();
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
index 1f9551d..9db930e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
@@ -38,8 +38,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public class StorageSideMonitoredBuffer extends MonitoredBuffer {
 
-    private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000; // 10
-                                                                      // seconds
+    private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000;
 
     private boolean ackingEnabled;
     private final boolean timeTrackingEnabled;
@@ -207,5 +206,4 @@ public class StorageSideMonitoredBuffer extends MonitoredBuffer {
     protected boolean reportInflowRate() {
         return false;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
index 932aece..0fbbd2e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableInt32;
 import org.apache.asterix.om.base.AMutableInt64;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
index 14235c0..9fa26f0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableInt32;
 import org.apache.asterix.om.base.AMutableInt64;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 7e9fdcb..aa35383 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -29,8 +29,8 @@ import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingScheduler;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
-import org.apache.asterix.external.input.stream.HDFSInputStreamProvider;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
+import org.apache.asterix.external.input.stream.provider.HDFSInputStreamProvider;
 import org.apache.asterix.external.provider.ExternalIndexerProvider;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
index fd5c397..365aeb0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.input.record;
 
+import java.nio.CharBuffer;
 import java.util.Arrays;
 
 import org.apache.asterix.external.api.IRawRecord;
@@ -99,8 +100,21 @@ public class CharArrayRecord implements IRawRecord<char[]> {
         size++;
     }
 
+    public void append(char[] recordBuffer) {
+        ensureCapacity(size + recordBuffer.length);
+        System.arraycopy(recordBuffer, 0, value, size, recordBuffer.length);
+        size += recordBuffer.length;
+    }
+
+    public void append(CharBuffer chars) {
+        ensureCapacity(size + chars.limit());
+        chars.get(value, size, chars.limit());
+        size += chars.limit();
+    }
+
     @Override
-    public Class<char[]> getRecordClass() {
-        return char[].class;
+    public void set(char[] value) {
+        this.value = value;
+        this.size = value.length;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
index 365bc22..f405b82 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
@@ -47,10 +47,6 @@ public class GenericRecord<T> implements IRawRecord<T> {
     }
 
     @Override
-    public Class<?> getRecordClass() {
-        return record.getClass();
-    }
-
     public void set(T record) {
         this.record = record;
     }
@@ -58,5 +54,4 @@ public class GenericRecord<T> implements IRawRecord<T> {
     @Override
     public void reset() {
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
new file mode 100644
index 0000000..d5640a6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class RecordWithMetadata<T> {
+
+    private ArrayBackedValueStorage[] fieldValueBuffers;
+    private DataOutput[] fieldValueBufferOutputs;
+    private IValueParserFactory[] valueParserFactories;
+    private byte[] fieldTypeTags;
+    private IRawRecord<T> record;
+
+    // Serializers
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+    private AMutableDouble mutableDouble = new AMutableDouble(0);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ASTRING);
+    private AMutableString mutableString = new AMutableString(null);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+    private AMutableInt32 mutableInt = new AMutableInt32(0);
+    @SuppressWarnings("unchecked")
+    protected ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    private AMutableInt64 mutableLong = new AMutableInt64(0);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+
+    public RecordWithMetadata(Class<? extends T> recordClass) {
+    }
+
+    public RecordWithMetadata(IAType[] metaTypes, Class<? extends T> recordClass) {
+        int n = metaTypes.length;
+        this.fieldValueBuffers = new ArrayBackedValueStorage[n];
+        this.fieldValueBufferOutputs = new DataOutput[n];
+        this.valueParserFactories = new IValueParserFactory[n];
+        this.fieldTypeTags = new byte[n];
+        for (int i = 0; i < n; i++) {
+            ATypeTag tag = metaTypes[i].getTypeTag();
+            fieldTypeTags[i] = tag.serialize();
+            fieldValueBuffers[i] = new ArrayBackedValueStorage();
+            fieldValueBufferOutputs[i] = fieldValueBuffers[i].getDataOutput();
+            valueParserFactories[i] = ExternalDataUtils.getParserFactory(tag);
+        }
+    }
+
+    public IRawRecord<T> getRecord() {
+        return record;
+    }
+
+    public ArrayBackedValueStorage getMetadata(int index) {
+        return fieldValueBuffers[index];
+    }
+
+    public void setRecord(IRawRecord<T> record) {
+        this.record = record;
+    }
+
+    public void reset() {
+        record.reset();
+        for (ArrayBackedValueStorage fieldBuffer : fieldValueBuffers) {
+            fieldBuffer.reset();
+        }
+    }
+
+    public void setMetadata(int index, int value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableInt.setValue(value);
+        IDataParser.toBytes(mutableInt, fieldValueBuffers[index], int32Serde);
+    }
+
+    public void setMetadata(int index, long value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableLong.setValue(value);
+        IDataParser.toBytes(mutableLong, fieldValueBuffers[index], int64Serde);
+    }
+
+    public void setMetadata(int index, String value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableString.setValue(value);
+        IDataParser.toBytes(mutableString, fieldValueBuffers[index], stringSerde);
+    }
+
+    public void setMeta(int index, boolean value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        IDataParser.toBytes(value ? ABoolean.TRUE : ABoolean.FALSE, fieldValueBuffers[index], booleanSerde);
+    }
+
+    public void setMeta(int index, double value) throws IOException {
+        fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+        mutableDouble.setValue(value);
+        IDataParser.toBytes(mutableDouble, fieldValueBuffers[index], doubleSerde);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
deleted file mode 100644
index 1b84e7a..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.RecordId;
-import org.apache.asterix.external.input.record.CharArrayRecord;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-
-public abstract class AbstractCharRecordLookupReader extends AbstractHDFSLookupRecordReader<char[]> {
-    public AbstractCharRecordLookupReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs,
-            Configuration conf) {
-        super(snapshotAccessor, fs, conf);
-    }
-
-    protected CharArrayRecord record = new CharArrayRecord();
-    protected Text value = new Text();
-    protected CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
-    protected ByteBuffer reusableByteBuffer = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
-    protected CharBuffer reusableCharBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
-
-    @Override
-    public Class<?> getRecordClass() throws IOException {
-        return char[].class;
-    }
-
-    @Override
-    protected IRawRecord<char[]> lookup(RecordId rid) throws IOException {
-        record.reset();
-        readRecord(rid);
-        writeRecord();
-        return record;
-    }
-
-    protected abstract void readRecord(RecordId rid) throws IOException;
-
-    private void writeRecord() {
-        reusableByteBuffer.clear();
-        if (reusableByteBuffer.remaining() < value.getLength()) {
-            reusableByteBuffer = ByteBuffer
-                    .allocateDirect(value.getLength() + ExternalDataConstants.DEFAULT_BUFFER_INCREMENT);
-        }
-        reusableByteBuffer.put(value.getBytes(), 0, value.getLength());
-        reusableByteBuffer.flip();
-        while (reusableByteBuffer.hasRemaining()) {
-            decoder.decode(reusableByteBuffer, reusableCharBuffer, false);
-            record.append(reusableCharBuffer.array(), 0, reusableCharBuffer.position());
-            reusableCharBuffer.clear();
-        }
-        record.endRecord();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
deleted file mode 100644
index 5a20962..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.asterix.external.api.ILookupRecordReader;
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.indexing.RecordId;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public abstract class AbstractHDFSLookupRecordReader<T> implements ILookupRecordReader<T> {
-
-    protected int fileId;
-    private ExternalFileIndexAccessor snapshotAccessor;
-    protected ExternalFile file;
-    protected FileSystem fs;
-    protected Configuration conf;
-    protected boolean replaced;
-
-    public AbstractHDFSLookupRecordReader(ExternalFileIndexAccessor snapshotAccessor, FileSystem fs,
-            Configuration conf) {
-        this.snapshotAccessor = snapshotAccessor;
-        this.fs = fs;
-        this.conf = conf;
-        this.fileId = -1;
-        this.file = new ExternalFile();
-    }
-
-    @Override
-    public void configure(Map<String, String> configurations) throws Exception {
-    }
-
-    @Override
-    public IRawRecord<T> read(RecordId rid) throws Exception {
-        if (rid.getFileId() != fileId) {
-            // close current file
-            closeFile();
-            // lookup new file
-            snapshotAccessor.lookup(rid.getFileId(), file);
-            fileId = rid.getFileId();
-            try {
-                validate();
-                if (!replaced) {
-                    openFile();
-                    validate();
-                    if (replaced) {
-                        closeFile();
-                    }
-                }
-            } catch (FileNotFoundException e) {
-                replaced = true;
-            }
-        }
-        if (replaced) {
-            return null;
-        }
-        return lookup(rid);
-    }
-
-    protected abstract IRawRecord<T> lookup(RecordId rid) throws IOException;
-
-    private void validate() throws IllegalArgumentException, IOException {
-        FileStatus fileStatus = fs.getFileStatus(new Path(file.getFileName()));
-        replaced = fileStatus.getModificationTime() != file.getLastModefiedTime().getTime();
-    }
-
-    protected abstract void closeFile();
-
-    protected abstract void openFile() throws IllegalArgumentException, IOException;
-
-    @Override
-    public final void open() throws HyracksDataException {
-        snapshotAccessor.open();
-    }
-
-    @Override
-    public void close() throws IOException {
-        try {
-            closeFile();
-        } finally {
-            snapshotAccessor.close();
-        }
-    }
-
-    @Override
-    public void fail() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
deleted file mode 100644
index 93ba0a0..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.CharArrayRecord;
-import org.apache.asterix.external.input.stream.AInputStream;
-import org.apache.asterix.external.input.stream.AInputStreamReader;
-import org.apache.asterix.external.util.ExternalDataConstants;
-
-public abstract class AbstractStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
-    protected AInputStreamReader reader;
-    protected CharArrayRecord record;
-    protected char[] inputBuffer;
-    protected int bufferLength = 0;
-    protected int bufferPosn = 0;
-    protected IExternalIndexer indexer;
-    protected boolean done = false;
-
-    @Override
-    public IRawRecord<char[]> next() throws IOException {
-        return record;
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (!done) {
-            reader.close();
-        }
-        done = true;
-    }
-
-    public void setInputStream(AInputStream inputStream) throws IOException {
-        this.reader = new AInputStreamReader(inputStream);
-    }
-
-    @Override
-    public Class<char[]> getRecordClass() {
-        return char[].class;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        record = new CharArrayRecord();
-        inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
-    }
-
-    @Override
-    public IExternalIndexer getIndexer() {
-        return indexer;
-    }
-
-    @Override
-    public void setIndexer(IExternalIndexer indexer) {
-        this.indexer = indexer;
-    }
-
-    @Override
-    public boolean stop() {
-        try {
-            reader.stop();
-            return true;
-        } catch (Exception e) {
-            e.printStackTrace();
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
deleted file mode 100644
index c7acb1a..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexibleExternalDataSource;
-import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.api.IInputStreamProvider;
-import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public abstract class AbstractStreamRecordReaderFactory<T>
-        implements IRecordReaderFactory<T>, IIndexibleExternalDataSource {
-
-    private static final long serialVersionUID = 1L;
-    protected IInputStreamProviderFactory inputStreamFactory;
-    protected Map<String, String> configuration;
-
-    public AbstractStreamRecordReaderFactory<T> setInputStreamFactoryProvider(
-            IInputStreamProviderFactory inputStreamFactory) {
-        this.inputStreamFactory = inputStreamFactory;
-        return this;
-    }
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.RECORDS;
-    }
-
-    @Override
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return inputStreamFactory.getPartitionConstraint();
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        this.configuration = configuration;
-        inputStreamFactory.configure(configuration);
-        configureStreamReaderFactory(configuration);
-    }
-
-    protected abstract void configureStreamReaderFactory(Map<String, String> configuration) throws Exception;
-
-    @Override
-    public boolean isIndexible() {
-        return inputStreamFactory.isIndexible();
-    }
-
-    @Override
-    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) throws Exception {
-        ((IIndexibleExternalDataSource) inputStreamFactory).setSnapshot(files, indexingOp);
-    }
-
-    @Override
-    public boolean isIndexingOp() {
-        if (inputStreamFactory.isIndexible()) {
-            return ((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp();
-        }
-        return false;
-    }
-
-    protected IRecordReader<char[]> configureReader(AbstractStreamRecordReader recordReader, IHyracksTaskContext ctx,
-            int partition) throws Exception {
-        IInputStreamProvider inputStreamProvider = inputStreamFactory.createInputStreamProvider(ctx, partition);
-        IExternalIndexer indexer = null;
-        if (inputStreamFactory.isIndexible()) {
-            if (((IIndexibleExternalDataSource) inputStreamFactory).isIndexingOp()) {
-                indexer = ((IIndexingDatasource) inputStreamProvider).getIndexer();
-            }
-        }
-        recordReader.setInputStream(inputStreamProvider.getInputStream());
-        recordReader.setIndexer(indexer);
-        recordReader.configure(configuration);
-        return recordReader;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
deleted file mode 100644
index d88f967..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.IExternalIndexer;
-import org.apache.asterix.external.api.IIndexingDatasource;
-import org.apache.asterix.external.api.IRawRecord;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class HDFSRecordReader<K, V extends Writable> implements IRecordReader<Writable>, IIndexingDatasource {
-
-    protected RecordReader<K, Writable> reader;
-    protected V value = null;
-    protected K key = null;
-    protected int currentSplitIndex = 0;
-    protected boolean read[];
-    protected InputFormat<?, ?> inputFormat;
-    protected InputSplit[] inputSplits;
-    protected String[] readSchedule;
-    protected String nodeName;
-    protected JobConf conf;
-    protected GenericRecord<Writable> record;
-    // Indexing variables
-    protected IExternalIndexer indexer;
-    protected List<ExternalFile> snapshot;
-    protected FileSystem hdfs;
-
-    public HDFSRecordReader(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
-            JobConf conf) {
-        this.read = read;
-        this.inputSplits = inputSplits;
-        this.readSchedule = readSchedule;
-        this.nodeName = nodeName;
-        this.conf = conf;
-        this.inputFormat = conf.getInputFormat();
-        this.reader = new EmptyRecordReader<K, Writable>();
-    }
-
-    @Override
-    public void close() throws IOException {
-        reader.close();
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        record = new GenericRecord<Writable>();
-        nextInputSplit();
-    }
-
-    @Override
-    public boolean hasNext() throws Exception {
-        if (reader.next(key, value)) {
-            return true;
-        }
-        while (nextInputSplit()) {
-            if (reader.next(key, value)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public IRawRecord<Writable> next() throws IOException {
-        record.set(value);
-        return record;
-    }
-
-    @Override
-    public Class<? extends Writable> getRecordClass() throws IOException {
-        if (value == null) {
-            if (!nextInputSplit()) {
-                return null;
-            }
-        }
-        return value.getClass();
-    }
-
-    private boolean nextInputSplit() throws IOException {
-        for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
-            /**
-             * read all the partitions scheduled to the current node
-             */
-            if (readSchedule[currentSplitIndex].equals(nodeName)) {
-                /**
-                 * pick an unread split to read synchronize among
-                 * simultaneous partitions in the same machine
-                 */
-                synchronized (read) {
-                    if (read[currentSplitIndex] == false) {
-                        read[currentSplitIndex] = true;
-                    } else {
-                        continue;
-                    }
-                }
-                if (snapshot != null) {
-                    String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
-                    FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName));
-                    // Skip if not the same file stored in the files snapshot
-                    if (fileStatus.getModificationTime() != snapshot.get(currentSplitIndex).getLastModefiedTime()
-                            .getTime())
-                        continue;
-                }
-
-                reader.close();
-                reader = getRecordReader(currentSplitIndex);
-                return true;
-            }
-        }
-        return false;
-    }
-
-    @SuppressWarnings("unchecked")
-    private RecordReader<K, Writable> getRecordReader(int splitIndex) throws IOException {
-        reader = (RecordReader<K, Writable>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
-        if (key == null) {
-            key = reader.createKey();
-            value = (V) reader.createValue();
-        }
-        if (indexer != null) {
-            try {
-                indexer.reset(this);
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
-        }
-        return reader;
-    }
-
-    @Override
-    public boolean stop() {
-        return false;
-    }
-
-    @Override
-    public IExternalIndexer getIndexer() {
-        return indexer;
-    }
-
-    @Override
-    public void setIndexer(IExternalIndexer indexer) {
-        this.indexer = indexer;
-    }
-
-    public List<ExternalFile> getSnapshot() {
-        return snapshot;
-    }
-
-    public void setSnapshot(List<ExternalFile> snapshot) throws IOException {
-        this.snapshot = snapshot;
-        hdfs = FileSystem.get(conf);
-    }
-
-    public int getCurrentSplitIndex() {
-        return currentSplitIndex;
-    }
-
-    public RecordReader<K, Writable> getReader() {
-        return reader;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
deleted file mode 100644
index ea851a5..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.Text;
-
-public class HDFSTextLineReader {
-    private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
-    private int bufferSize = DEFAULT_BUFFER_SIZE;
-    private FSDataInputStream reader;
-
-    private byte[] buffer;
-    // the number of bytes of real data in the buffer
-    private int bufferLength = 0;
-    // the current position in the buffer
-    private int bufferPosn = 0;
-
-    private long currentFilePos = 0L;
-
-    private static final byte CR = '\r';
-    private static final byte LF = '\n';
-
-    public static final String KEY_BUFFER_SIZE = "io.file.buffer.size";
-
-    /**
-     * Create a line reader that reads from the given stream using the
-     * default buffer-size (32k).
-     *
-     * @param in
-     *            The input stream
-     * @throws IOException
-     */
-    public HDFSTextLineReader(FSDataInputStream in) throws IOException {
-        this(in, DEFAULT_BUFFER_SIZE);
-    }
-
-    /**
-     * Create a line reader that reads from the given stream using the
-     * given buffer-size.
-     *
-     * @param in
-     *            The input stream
-     * @param bufferSize
-     *            Size of the read buffer
-     * @throws IOException
-     */
-    public HDFSTextLineReader(FSDataInputStream in, int bufferSize) throws IOException {
-        this.reader = in;
-        this.bufferSize = bufferSize;
-        this.buffer = new byte[this.bufferSize];
-        currentFilePos = in.getPos();
-    }
-
-    public HDFSTextLineReader() throws IOException {
-        this.bufferSize = DEFAULT_BUFFER_SIZE;
-        this.buffer = new byte[this.bufferSize];
-    }
-
-    /**
-     * Create a line reader that reads from the given stream using the <code>io.file.buffer.size</code> specified in the given <code>Configuration</code>.
-     *
-     * @param in
-     *            input stream
-     * @param conf
-     *            configuration
-     * @throws IOException
-     */
-    public HDFSTextLineReader(FSDataInputStream in, Configuration conf) throws IOException {
-        this(in, conf.getInt(KEY_BUFFER_SIZE, DEFAULT_BUFFER_SIZE));
-    }
-
-    /**
-     * Read one line from the InputStream into the given Text. A line
-     * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
-     * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
-     * line.
-     *
-     * @param str
-     *            the object to store the given line (without newline)
-     * @param maxLineLength
-     *            the maximum number of bytes to store into str;
-     *            the rest of the line is silently discarded.
-     * @param maxBytesToConsume
-     *            the maximum number of bytes to consume
-     *            in this call. This is only a hint, because if the line cross
-     *            this threshold, we allow it to happen. It can overshoot
-     *            potentially by as much as one buffer length.
-     * @return the number of bytes read including the (longest) newline
-     *         found.
-     * @throws IOException
-     *             if the underlying stream throws
-     */
-    public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {
-        /* We're reading data from in, but the head of the stream may be
-         * already buffered in buffer, so we have several cases:
-         * 1. No newline characters are in the buffer, so we need to copy
-         *    everything and read another buffer from the stream.
-         * 2. An unambiguously terminated line is in buffer, so we just
-         *    copy to str.
-         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-         *    in CR.  In this case we copy everything up to CR to str, but
-         *    we also need to see what follows CR: if it's LF, then we
-         *    need consume LF as well, so next call to readLine will read
-         *    from after that.
-         * We use a flag prevCharCR to signal if previous character was CR
-         * and, if it happens to be at the end of the buffer, delay
-         * consuming it until we have a chance to look at the char that
-         * follows.
-         */
-        str.clear();
-        int txtLength = 0; //tracks str.getLength(), as an optimization
-        int newlineLength = 0; //length of terminating newline
-        boolean prevCharCR = false; //true of prev char was CR
-        long bytesConsumed = 0;
-        do {
-            int startPosn = bufferPosn; //starting from where we left off the last time
-            if (bufferPosn >= bufferLength) {
-                startPosn = bufferPosn = 0;
-                if (prevCharCR)
-                    ++bytesConsumed; //account for CR from previous read
-                bufferLength = reader.read(buffer);
-                if (bufferLength <= 0)
-                    break; // EOF
-            }
-            for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
-                if (buffer[bufferPosn] == LF) {
-                    newlineLength = (prevCharCR) ? 2 : 1;
-                    ++bufferPosn; // at next invocation proceed from following byte
-                    break;
-                }
-                if (prevCharCR) { //CR + notLF, we are at notLF
-                    newlineLength = 1;
-                    break;
-                }
-                prevCharCR = (buffer[bufferPosn] == CR);
-            }
-            int readLength = bufferPosn - startPosn;
-            if (prevCharCR && newlineLength == 0)
-                --readLength; //CR at the end of the buffer
-            bytesConsumed += readLength;
-            int appendLength = readLength - newlineLength;
-            if (appendLength > maxLineLength - txtLength) {
-                appendLength = maxLineLength - txtLength;
-            }
-            if (appendLength > 0) {
-                str.append(buffer, startPosn, appendLength);
-                txtLength += appendLength;
-            }
-        } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
-
-        if (bytesConsumed > Integer.MAX_VALUE)
-            throw new IOException("Too many bytes before newline: " + bytesConsumed);
-        currentFilePos = reader.getPos() - bufferLength + bufferPosn;
-        return (int) bytesConsumed;
-    }
-
-    /**
-     * Read from the InputStream into the given Text.
-     *
-     * @param str
-     *            the object to store the given line
-     * @param maxLineLength
-     *            the maximum number of bytes to store into str.
-     * @return the number of bytes read including the newline
-     * @throws IOException
-     *             if the underlying stream throws
-     */
-    public int readLine(Text str, int maxLineLength) throws IOException {
-        return readLine(str, maxLineLength, Integer.MAX_VALUE);
-    }
-
-    /**
-     * Read from the InputStream into the given Text.
-     *
-     * @param str
-     *            the object to store the given line
-     * @return the number of bytes read including the newline
-     * @throws IOException
-     *             if the underlying stream throws
-     */
-    public int readLine(Text str) throws IOException {
-        return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
-    }
-
-    public void seek(long desired) throws IOException {
-        if (reader.getPos() <= desired || currentFilePos > desired) {
-            // desired position is ahead of stream or before the current position, seek to position
-            reader.seek(desired);
-            bufferLength = 0;
-            bufferPosn = 0;
-            currentFilePos = desired;
-        } else if (currentFilePos < desired) {
-            // desired position is in the buffer
-            int difference = (int) (desired - currentFilePos);
-            bufferPosn += difference;
-            currentFilePos = desired;
-        }
-    }
-
-    public FSDataInputStream getReader() {
-        return reader;
-    }
-
-    public void resetReader(FSDataInputStream reader) throws IOException {
-        this.reader = reader;
-        bufferLength = 0;
-        bufferPosn = 0;
-        currentFilePos = reader.getPos();
-    }
-
-    public void close() throws IOException {
-        reader.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ee387c12/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
deleted file mode 100644
index 2b33d7a..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.record.reader;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-
-public class LineRecordReader extends AbstractStreamRecordReader {
-
-    protected boolean prevCharCR;
-    protected int newlineLength;
-    protected int recordNumber = 0;
-
-    @Override
-    public boolean hasNext() throws IOException {
-        if (done) {
-            return false;
-        }
-        /* We're reading data from in, but the head of the stream may be
-         * already buffered in buffer, so we have several cases:
-         * 1. No newline characters are in the buffer, so we need to copy
-         *    everything and read another buffer from the stream.
-         * 2. An unambiguously terminated line is in buffer, so we just
-         *    copy to record.
-         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
-         *    in CR.  In this case we copy everything up to CR to record, but
-         *    we also need to see what follows CR: if it's LF, then we
-         *    need consume LF as well, so next call to readLine will read
-         *    from after that.
-         * We use a flag prevCharCR to signal if previous character was CR
-         * and, if it happens to be at the end of the buffer, delay
-         * consuming it until we have a chance to look at the char that
-         * follows.
-         */
-        newlineLength = 0; //length of terminating newline
-        prevCharCR = false; //true of prev char was CR
-        record.reset();
-        int readLength = 0;
-        do {
-            int startPosn = bufferPosn; //starting from where we left off the last time
-            if (bufferPosn >= bufferLength) {
-                startPosn = bufferPosn = 0;
-                bufferLength = reader.read(inputBuffer);
-                if (bufferLength <= 0) {
-                    if (readLength > 0) {
-                        record.endRecord();
-                        recordNumber++;
-                        return true;
-                    }
-                    close();
-                    return false; //EOF
-                }
-            }
-            for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
-                if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
-                    newlineLength = (prevCharCR) ? 2 : 1;
-                    ++bufferPosn; // at next invocation proceed from following byte
-                    break;
-                }
-                if (prevCharCR) { //CR + notLF, we are at notLF
-                    newlineLength = 1;
-                    break;
-                }
-                prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
-            }
-            readLength = bufferPosn - startPosn;
-            if (prevCharCR && newlineLength == 0) {
-                --readLength; //CR at the end of the buffer
-            }
-            if (readLength > 0) {
-                record.append(inputBuffer, startPosn, readLength);
-            }
-        } while (newlineLength == 0);
-        recordNumber++;
-        return true;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) throws Exception {
-        super.configure(configuration);
-        if (ExternalDataUtils.hasHeader(configuration)) {
-            if (hasNext()) {
-                next();
-            }
-        }
-    }
-}
\ No newline at end of file