You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:07 UTC

[09/21] incubator-asterixdb git commit: First stage of external data cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
new file mode 100644
index 0000000..146064a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Map;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.dataflow.data.nontagged.serde.ANullSerializerDeserializer;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import org.apache.hyracks.dataflow.std.file.FieldCursorForDelimitedDataParser;
+
+public class DelimitedDataParser extends AbstractDataParser implements IStreamDataParser, IRecordDataParser<char[]> {
+
+    private final IValueParserFactory[] valueParserFactories;
+    private final char fieldDelimiter;
+    private final char quote;
+    private final boolean hasHeader;
+    private ARecordType recordType;
+    private IARecordBuilder recBuilder;
+    private ArrayBackedValueStorage fieldValueBuffer;
+    private DataOutput fieldValueBufferOutput;
+    private IValueParser[] valueParsers;
+    private FieldCursorForDelimitedDataParser cursor;
+    private byte[] fieldTypeTags;
+    private int[] fldIds;
+    private ArrayBackedValueStorage[] nameBuffers;
+    private boolean areAllNullFields;
+    private boolean isStreamParser = true;
+
+    public DelimitedDataParser(IValueParserFactory[] valueParserFactories, char fieldDelimter, char quote,
+            boolean hasHeader) {
+        this.valueParserFactories = valueParserFactories;
+        this.fieldDelimiter = fieldDelimter;
+        this.quote = quote;
+        this.hasHeader = hasHeader;
+    }
+
+    @Override
+    public boolean parse(DataOutput out) throws AsterixException, IOException {
+        while (cursor.nextRecord()) {
+            parseRecord(out);
+            if (!areAllNullFields) {
+                recBuilder.write(out, true);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void parseRecord(DataOutput out) throws AsterixException, IOException {
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        areAllNullFields = true;
+
+        for (int i = 0; i < valueParsers.length; ++i) {
+            if (!cursor.nextField()) {
+                break;
+            }
+            fieldValueBuffer.reset();
+
+            if (cursor.fStart == cursor.fEnd && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.STRING
+                    && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.NULL) {
+                // if the field is empty and the type is optional, insert
+                // NULL. Note that string type can also process empty field as an
+                // empty string
+                if (!NonTaggedFormatUtil.isOptional(recordType.getFieldTypes()[i])) {
+                    throw new AsterixException("At record: " + cursor.recordCount + " - Field " + cursor.fieldCount
+                            + " is not an optional type so it cannot accept null value. ");
+                }
+                fieldValueBufferOutput.writeByte(ATypeTag.NULL.serialize());
+                ANullSerializerDeserializer.INSTANCE.serialize(ANull.NULL, out);
+            } else {
+                fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
+                // Eliminate doule quotes in the field that we are going to parse
+                if (cursor.isDoubleQuoteIncludedInThisField) {
+                    cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+                    cursor.fEnd -= cursor.doubleQuoteCount;
+                    cursor.isDoubleQuoteIncludedInThisField = false;
+                }
+                valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart,
+                        fieldValueBufferOutput);
+                areAllNullFields = false;
+            }
+            if (fldIds[i] < 0) {
+                recBuilder.addField(nameBuffers[i], fieldValueBuffer);
+            } else {
+                recBuilder.addField(fldIds[i], fieldValueBuffer);
+            }
+        }
+    }
+
+    protected void fieldNameToBytes(String fieldName, AMutableString str, ArrayBackedValueStorage buffer)
+            throws HyracksDataException {
+        buffer.reset();
+        DataOutput out = buffer.getDataOutput();
+        str.setValue(fieldName);
+        try {
+            stringSerde.serialize(str, out);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return isStreamParser ? DataSourceType.STREAM : DataSourceType.RECORDS;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType recordType) throws HyracksDataException {
+        this.recordType = recordType;
+        valueParsers = new IValueParser[valueParserFactories.length];
+        for (int i = 0; i < valueParserFactories.length; ++i) {
+            valueParsers[i] = valueParserFactories[i].createValueParser();
+        }
+
+        fieldValueBuffer = new ArrayBackedValueStorage();
+        fieldValueBufferOutput = fieldValueBuffer.getDataOutput();
+        recBuilder = new RecordBuilder();
+        recBuilder.reset(recordType);
+        recBuilder.init();
+
+        int n = recordType.getFieldNames().length;
+        fieldTypeTags = new byte[n];
+        for (int i = 0; i < n; i++) {
+            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+            fieldTypeTags[i] = tag.serialize();
+        }
+
+        fldIds = new int[n];
+        nameBuffers = new ArrayBackedValueStorage[n];
+        AMutableString str = new AMutableString(null);
+        for (int i = 0; i < n; i++) {
+            String name = recordType.getFieldNames()[i];
+            fldIds[i] = recBuilder.getFieldId(name);
+            if (fldIds[i] < 0) {
+                if (!recordType.isOpen()) {
+                    throw new HyracksDataException("Illegal field " + name + " in closed type " + recordType);
+                } else {
+                    nameBuffers[i] = new ArrayBackedValueStorage();
+                    fieldNameToBytes(name, str, nameBuffers[i]);
+                }
+            }
+        }
+        isStreamParser = ExternalDataUtils.isDataSourceStreamProvider(configuration);
+        if (!isStreamParser) {
+            cursor = new FieldCursorForDelimitedDataParser(null, fieldDelimiter, quote);
+        }
+    }
+
+    @Override
+    public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws Exception {
+        cursor.nextRecord(record.get(), record.size());
+        parseRecord(out);
+        if (!areAllNullFields) {
+            recBuilder.write(out, true);
+        }
+    }
+
+    @Override
+    public Class<? extends char[]> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    public void setInputStream(InputStream in) throws Exception {
+        cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
+        if (in != null && hasHeader) {
+            cursor.nextRecord();
+            while (cursor.nextField());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java
new file mode 100644
index 0000000..fb61339
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.builders.UnorderedListBuilder;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+@SuppressWarnings("deprecation")
+public class HiveRecordParser implements IRecordDataParser<Writable> {
+
+    private ARecordType aRecord;
+    private SerDe hiveSerde;
+    private StructObjectInspector oi;
+    private IARecordBuilder recBuilder;
+    private ArrayBackedValueStorage fieldValueBuffer;
+    private ArrayBackedValueStorage listItemBuffer;
+    private byte[] fieldTypeTags;
+    private IAType[] fieldTypes;
+    private OrderedListBuilder orderedListBuilder;
+    private UnorderedListBuilder unorderedListBuilder;
+    private List<? extends StructField> fieldRefs;
+    private UTF8StringWriter utf8Writer = new UTF8StringWriter();
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType recordType) throws HyracksDataException {
+        try {
+            this.aRecord = recordType;
+            int n = aRecord.getFieldNames().length;
+            fieldTypes = aRecord.getFieldTypes();
+            JobConf hadoopConfiguration = HDFSUtils.configureHDFSJobConf(configuration);
+            //create the hive table schema.
+            Properties tbl = new Properties();
+            tbl.put(Constants.LIST_COLUMNS, getCommaDelimitedColNames(aRecord));
+            tbl.put(Constants.LIST_COLUMN_TYPES, getColTypes(aRecord));
+            String hiveSerdeClassName = configuration.get(ExternalDataConstants.KEY_HIVE_SERDE);
+            if (hiveSerdeClassName == null) {
+                throw new IllegalArgumentException("no hive serde provided for hive deserialized records");
+            }
+            hiveSerde = (SerDe) Class.forName(hiveSerdeClassName).newInstance();
+            hiveSerde.initialize(hadoopConfiguration, tbl);
+            oi = (StructObjectInspector) hiveSerde.getObjectInspector();
+
+            fieldValueBuffer = new ArrayBackedValueStorage();
+            recBuilder = new RecordBuilder();
+            recBuilder.reset(aRecord);
+            recBuilder.init();
+            fieldTypeTags = new byte[n];
+            for (int i = 0; i < n; i++) {
+                ATypeTag tag = aRecord.getFieldTypes()[i].getTypeTag();
+                fieldTypeTags[i] = tag.serialize();
+            }
+            fieldRefs = oi.getAllStructFieldRefs();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void parse(IRawRecord<? extends Writable> record, DataOutput out) throws Exception {
+        Writable hiveRawRecord = record.get();
+        Object hiveObject = hiveSerde.deserialize(hiveRawRecord);
+        int n = aRecord.getFieldNames().length;
+        List<Object> attributesValues = oi.getStructFieldsDataAsList(hiveObject);
+        recBuilder.reset(aRecord);
+        recBuilder.init();
+        for (int i = 0; i < n; i++) {
+            final Object value = attributesValues.get(i);
+            final ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
+            fieldValueBuffer.reset();
+            final DataOutput dataOutput = fieldValueBuffer.getDataOutput();
+            dataOutput.writeByte(fieldTypeTags[i]);
+            //get field type
+            parseItem(fieldTypes[i], value, foi, dataOutput, false);
+            recBuilder.addField(i, fieldValueBuffer);
+        }
+        recBuilder.write(out, true);
+
+    }
+
+    private void parseItem(IAType itemType, Object value, ObjectInspector foi, DataOutput dataOutput,
+            boolean primitiveOnly) throws IOException {
+        switch (itemType.getTypeTag()) {
+            case BOOLEAN:
+                parseBoolean(value, (BooleanObjectInspector) foi, dataOutput);
+                break;
+            case TIME:
+                parseTime(value, (TimestampObjectInspector) foi, dataOutput);
+                break;
+            case DATE:
+                parseDate(value, (TimestampObjectInspector) foi, dataOutput);
+                break;
+            case DATETIME:
+                parseDateTime(value, (TimestampObjectInspector) foi, dataOutput);
+                break;
+            case DOUBLE:
+                parseDouble(value, (DoubleObjectInspector) foi, dataOutput);
+                break;
+            case FLOAT:
+                parseFloat(value, (FloatObjectInspector) foi, dataOutput);
+                break;
+            case INT8:
+                parseInt8(value, (ByteObjectInspector) foi, dataOutput);
+                break;
+            case INT16:
+                parseInt16(value, (ShortObjectInspector) foi, dataOutput);
+                break;
+            case INT32:
+                parseInt32(value, (IntObjectInspector) foi, dataOutput);
+                break;
+            case INT64:
+                parseInt64(value, (LongObjectInspector) foi, dataOutput);
+                break;
+            case STRING:
+                parseString(value, (StringObjectInspector) foi, dataOutput);
+                break;
+            case ORDEREDLIST:
+                if (primitiveOnly) {
+                    throw new HyracksDataException("doesn't support hive data with list of non-primitive types");
+                }
+                parseOrderedList((AOrderedListType) itemType, value, (ListObjectInspector) foi);
+                break;
+            case UNORDEREDLIST:
+                if (primitiveOnly) {
+                    throw new HyracksDataException("doesn't support hive data with list of non-primitive types");
+                }
+                parseUnorderedList((AUnorderedListType) itemType, value, (ListObjectInspector) foi);
+                break;
+            default:
+                throw new HyracksDataException("Can't get hive type for field of type " + itemType.getTypeTag());
+        }
+    }
+
+    @Override
+    public Class<? extends Writable> getRecordClass() {
+        return Writable.class;
+    }
+
+    private Object getColTypes(ARecordType record) throws Exception {
+        int n = record.getFieldTypes().length;
+        if (n < 1) {
+            throw new HyracksDataException("Failed to get columns of record");
+        }
+        //First Column
+        String cols = getHiveTypeString(record.getFieldTypes(), 0);
+        for (int i = 1; i < n; i++) {
+            cols = cols + "," + getHiveTypeString(record.getFieldTypes(), i);
+        }
+        return cols;
+    }
+
+    private String getCommaDelimitedColNames(ARecordType record) throws Exception {
+        if (record.getFieldNames().length < 1) {
+            throw new HyracksDataException("Can't deserialize hive records with no closed columns");
+        }
+
+        String cols = record.getFieldNames()[0];
+        for (int i = 1; i < record.getFieldNames().length; i++) {
+            cols = cols + "," + record.getFieldNames()[i];
+        }
+        return cols;
+    }
+
+    private String getHiveTypeString(IAType[] types, int i) throws Exception {
+        final IAType type = types[i];
+        ATypeTag tag = type.getTypeTag();
+        if (tag == ATypeTag.UNION) {
+            if (NonTaggedFormatUtil.isOptional(type)) {
+                throw new NotImplementedException("Non-optional UNION type is not supported.");
+            }
+            tag = ((AUnionType) type).getNullableType().getTypeTag();
+        }
+        if (tag == null) {
+            throw new NotImplementedException("Failed to get the type information for field " + i + ".");
+        }
+        switch (tag) {
+            case BOOLEAN:
+                return Constants.BOOLEAN_TYPE_NAME;
+            case DATE:
+                return Constants.DATE_TYPE_NAME;
+            case DATETIME:
+                return Constants.DATETIME_TYPE_NAME;
+            case DOUBLE:
+                return Constants.DOUBLE_TYPE_NAME;
+            case FLOAT:
+                return Constants.FLOAT_TYPE_NAME;
+            case INT16:
+                return Constants.SMALLINT_TYPE_NAME;
+            case INT32:
+                return Constants.INT_TYPE_NAME;
+            case INT64:
+                return Constants.BIGINT_TYPE_NAME;
+            case INT8:
+                return Constants.TINYINT_TYPE_NAME;
+            case ORDEREDLIST:
+                return Constants.LIST_TYPE_NAME;
+            case STRING:
+                return Constants.STRING_TYPE_NAME;
+            case TIME:
+                return Constants.DATETIME_TYPE_NAME;
+            case UNORDEREDLIST:
+                return Constants.LIST_TYPE_NAME;
+            default:
+                throw new HyracksDataException("Can't get hive type for field of type " + tag);
+        }
+    }
+
+    private void parseInt64(Object obj, LongObjectInspector foi, DataOutput dataOutput) throws IOException {
+        dataOutput.writeLong(foi.get(obj));
+    }
+
+    private void parseInt32(Object obj, IntObjectInspector foi, DataOutput dataOutput) throws IOException {
+        if (obj == null) {
+            throw new HyracksDataException("can't parse null field");
+        }
+        dataOutput.writeInt(foi.get(obj));
+    }
+
+    private void parseInt16(Object obj, ShortObjectInspector foi, DataOutput dataOutput) throws IOException {
+        dataOutput.writeShort(foi.get(obj));
+    }
+
+    private void parseFloat(Object obj, FloatObjectInspector foi, DataOutput dataOutput) throws IOException {
+        dataOutput.writeFloat(foi.get(obj));
+    }
+
+    private void parseDouble(Object obj, DoubleObjectInspector foi, DataOutput dataOutput) throws IOException {
+        dataOutput.writeDouble(foi.get(obj));
+    }
+
+    private void parseDateTime(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
+        dataOutput.writeLong(foi.getPrimitiveJavaObject(obj).getTime());
+    }
+
+    private void parseDate(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
+        long chrononTimeInMs = foi.getPrimitiveJavaObject(obj).getTime();
+        short temp = 0;
+        if (chrononTimeInMs < 0 && chrononTimeInMs % GregorianCalendarSystem.CHRONON_OF_DAY != 0) {
+            temp = 1;
+        }
+        dataOutput.writeInt((int) (chrononTimeInMs / GregorianCalendarSystem.CHRONON_OF_DAY) - temp);
+    }
+
+    private void parseBoolean(Object obj, BooleanObjectInspector foi, DataOutput dataOutput) throws IOException {
+        dataOutput.writeBoolean(foi.get(obj));
+    }
+
+    private void parseInt8(Object obj, ByteObjectInspector foi, DataOutput dataOutput) throws IOException {
+        dataOutput.writeByte(foi.get(obj));
+    }
+
+    private void parseString(Object obj, StringObjectInspector foi, DataOutput dataOutput) throws IOException {
+        utf8Writer.writeUTF8(foi.getPrimitiveJavaObject(obj), dataOutput);
+    }
+
+    private void parseTime(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
+        dataOutput.writeInt((int) (foi.getPrimitiveJavaObject(obj).getTime() % 86400000));
+    }
+
+    private void parseOrderedList(AOrderedListType aOrderedListType, Object obj, ListObjectInspector foi)
+            throws IOException {
+        OrderedListBuilder orderedListBuilder = getOrderedListBuilder();
+        IAType itemType = null;
+        if (aOrderedListType != null)
+            itemType = aOrderedListType.getItemType();
+        orderedListBuilder.reset(aOrderedListType);
+
+        int n = foi.getListLength(obj);
+        for (int i = 0; i < n; i++) {
+            Object element = foi.getListElement(obj, i);
+            ObjectInspector eoi = foi.getListElementObjectInspector();
+            if (element == null) {
+                throw new HyracksDataException("can't parse hive list with null values");
+            }
+            parseItem(itemType, element, eoi, listItemBuffer.getDataOutput(), true);
+            orderedListBuilder.addItem(listItemBuffer);
+        }
+        orderedListBuilder.write(fieldValueBuffer.getDataOutput(), true);
+    }
+
+    private void parseUnorderedList(AUnorderedListType uoltype, Object obj, ListObjectInspector oi) throws IOException {
+        UnorderedListBuilder unorderedListBuilder = getUnorderedListBuilder();
+        IAType itemType = null;
+        if (uoltype != null)
+            itemType = uoltype.getItemType();
+        byte tagByte = itemType.getTypeTag().serialize();
+        unorderedListBuilder.reset(uoltype);
+
+        int n = oi.getListLength(obj);
+        for (int i = 0; i < n; i++) {
+            Object element = oi.getListElement(obj, i);
+            ObjectInspector eoi = oi.getListElementObjectInspector();
+            if (element == null) {
+                throw new HyracksDataException("can't parse hive list with null values");
+            }
+            listItemBuffer.reset();
+            final DataOutput dataOutput = listItemBuffer.getDataOutput();
+            dataOutput.writeByte(tagByte);
+            parseItem(itemType, element, eoi, dataOutput, true);
+            unorderedListBuilder.addItem(listItemBuffer);
+        }
+        unorderedListBuilder.write(fieldValueBuffer.getDataOutput(), true);
+    }
+
+    private OrderedListBuilder getOrderedListBuilder() {
+        if (orderedListBuilder != null)
+            return orderedListBuilder;
+        else {
+            orderedListBuilder = new OrderedListBuilder();
+            return orderedListBuilder;
+        }
+    }
+
+    private UnorderedListBuilder getUnorderedListBuilder() {
+        if (unorderedListBuilder != null)
+            return unorderedListBuilder;
+        else {
+            unorderedListBuilder = new UnorderedListBuilder();
+            return unorderedListBuilder;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java
new file mode 100644
index 0000000..4d93dc5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.om.base.AMutableRecord;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+
+public class RSSParser implements IRecordDataParser<SyndEntryImpl> {
+    private long id = 0;
+    private String idPrefix;
+    private AMutableString[] mutableFields;
+    private String[] tupleFieldValues;
+    private AMutableRecord mutableRecord;
+    private RecordBuilder recordBuilder = new RecordBuilder();
+    private int numFields;
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType recordType)
+            throws HyracksDataException, IOException {
+        mutableFields = new AMutableString[] { new AMutableString(null), new AMutableString(null),
+                new AMutableString(null), new AMutableString(null) };
+        mutableRecord = new AMutableRecord(recordType, mutableFields);
+        tupleFieldValues = new String[recordType.getFieldNames().length];
+        numFields = recordType.getFieldNames().length;
+    }
+
+    @Override
+    public void parse(IRawRecord<? extends SyndEntryImpl> record, DataOutput out) throws Exception {
+        SyndEntryImpl entry = record.get();
+        tupleFieldValues[0] = idPrefix + ":" + id;
+        tupleFieldValues[1] = entry.getTitle();
+        tupleFieldValues[2] = entry.getDescription().getValue();
+        tupleFieldValues[3] = entry.getLink();
+        for (int i = 0; i < numFields; i++) {
+            mutableFields[i].setValue(tupleFieldValues[i]);
+            mutableRecord.setValueAtPos(i, mutableFields[i]);
+        }
+        recordBuilder.reset(mutableRecord.getType());
+        recordBuilder.init();
+        IDataParser.writeRecord(mutableRecord, out, recordBuilder);
+        id++;
+    }
+
+    @Override
+    public Class<? extends SyndEntryImpl> getRecordClass() {
+        return SyndEntryImpl.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
new file mode 100644
index 0000000..b9cd60b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.library.java.JObjectUtil;
+import org.apache.asterix.external.util.Datatypes.Tweet;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableRecord;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import twitter4j.Status;
+import twitter4j.User;
+
+public class TweetParser implements IRecordDataParser<Status> {
+
+    private IAObject[] mutableTweetFields;
+    private IAObject[] mutableUserFields;
+    private AMutableRecord mutableRecord;
+    private AMutableRecord mutableUser;
+    private final Map<String, Integer> userFieldNameMap = new HashMap<>();
+    private final Map<String, Integer> tweetFieldNameMap = new HashMap<>();
+    private RecordBuilder recordBuilder = new RecordBuilder();
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType recordType)
+            throws HyracksDataException, IOException {
+        initFieldNames(recordType);
+        mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0),
+                new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) };
+        mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)],
+                mutableUserFields);
+
+        mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0),
+                new AMutableDouble(0), new AMutableString(null), new AMutableString(null) };
+        mutableRecord = new AMutableRecord(recordType, mutableTweetFields);
+
+    }
+
+    // Initialize the hashmap values for the field names and positions
+    private void initFieldNames(ARecordType recordType) {
+        String tweetFields[] = recordType.getFieldNames();
+        for (int i = 0; i < tweetFields.length; i++) {
+            tweetFieldNameMap.put(tweetFields[i], i);
+            if (tweetFields[i].equals(Tweet.USER)) {
+                IAType fieldType = recordType.getFieldTypes()[i];
+                if (fieldType.getTypeTag() == ATypeTag.RECORD) {
+                    String userFields[] = ((ARecordType) fieldType).getFieldNames();
+                    for (int j = 0; j < userFields.length; j++) {
+                        userFieldNameMap.put(userFields[j], j);
+                    }
+                }
+
+            }
+        }
+    }
+
+    @Override
+    public void parse(IRawRecord<? extends Status> record, DataOutput out) throws Exception {
+        Status tweet = record.get();
+        User user = tweet.getUser();
+        // Tweet user data
+        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)])
+                .setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
+        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)])
+                .setValue(JObjectUtil.getNormalizedString(user.getLang()));
+        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount());
+        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount());
+        ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)])
+                .setValue(JObjectUtil.getNormalizedString(user.getName()));
+        ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)])
+                .setValue(user.getFollowersCount());
+
+        // Tweet data
+        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId()));
+
+        int userPos = tweetFieldNameMap.get(Tweet.USER);
+        for (int i = 0; i < mutableUserFields.length; i++) {
+            ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i, mutableUserFields[i]);
+        }
+        if (tweet.getGeoLocation() != null) {
+            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)])
+                    .setValue(tweet.getGeoLocation().getLatitude());
+            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)])
+                    .setValue(tweet.getGeoLocation().getLongitude());
+        } else {
+            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0);
+            ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0);
+        }
+        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)])
+                .setValue(JObjectUtil.getNormalizedString(tweet.getCreatedAt().toString()));
+        ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)])
+                .setValue(JObjectUtil.getNormalizedString(tweet.getText()));
+
+        for (int i = 0; i < mutableTweetFields.length; i++) {
+            mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
+        }
+        recordBuilder.reset(mutableRecord.getType());
+        recordBuilder.init();
+        IDataParser.writeRecord(mutableRecord, out, recordBuilder);
+    }
+
+    @Override
+    public Class<? extends Status> getRecordClass() {
+        return Status.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
new file mode 100644
index 0000000..4634278
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.ADMDataParser;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ADMDataParserFactory extends AbstractRecordStreamParserFactory<char[]> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
+        return createParser();
+    }
+
+    private ADMDataParser createParser() throws HyracksDataException {
+        try {
+            ADMDataParser parser = new ADMDataParser();
+            parser.configure(configuration, recordType);
+            return parser;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public Class<? extends char[]> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        return createParser();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AbstractRecordStreamParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AbstractRecordStreamParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AbstractRecordStreamParserFactory.java
new file mode 100644
index 0000000..43af455
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AbstractRecordStreamParserFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.api.IStreamDataParserFactory;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.types.ARecordType;
+
+public abstract class AbstractRecordStreamParserFactory<T>
+        implements IStreamDataParserFactory, IRecordDataParserFactory<T> {
+
+    private static final long serialVersionUID = 1L;
+    protected ARecordType recordType;
+    protected Map<String, String> configuration;
+
+    @Override
+    public DataSourceType getDataSourceType() throws AsterixException {
+        return ExternalDataUtils.getDataSourceType(configuration);
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void setRecordType(ARecordType recordType) {
+        this.recordType = recordType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
new file mode 100644
index 0000000..fa63d45
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.DelimitedDataParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class DelimitedDataParserFactory extends AbstractRecordStreamParserFactory<char[]> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx)
+            throws HyracksDataException, AsterixException {
+        return createParser();
+    }
+
+    private DelimitedDataParser createParser() throws HyracksDataException, AsterixException {
+        IValueParserFactory[] valueParserFactories = ExternalDataUtils.getValueParserFactories(recordType);
+        Character delimiter = DelimitedDataParserFactory.getDelimiter(configuration);
+        char quote = DelimitedDataParserFactory.getQuote(configuration, delimiter);
+        boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
+        DelimitedDataParser parser = new DelimitedDataParser(valueParserFactories, delimiter, quote, hasHeader);
+        parser.configure(configuration, recordType);
+        return parser;
+    }
+
+    @Override
+    public Class<? extends char[]> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException, AsterixException {
+        return createParser();
+    }
+
+    // Get a delimiter from the given configuration
+    public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
+        String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
+        if (delimiterValue == null) {
+            delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
+        } else if (delimiterValue.length() != 1) {
+            throw new AsterixException(
+                    "'" + delimiterValue + "' is not a valid delimiter. The length of a delimiter should be 1.");
+        }
+        return delimiterValue.charAt(0);
+    }
+
+    // Get a quote from the given configuration when the delimiter is given
+    // Need to pass delimiter to check whether they share the same character
+    public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
+        String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
+        if (quoteValue == null) {
+            quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
+        } else if (quoteValue.length() != 1) {
+            throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
+        }
+
+        // Since delimiter (char type value) can't be null,
+        // we only check whether delimiter and quote use the same character
+        if (quoteValue.charAt(0) == delimiter) {
+            throw new AsterixException(
+                    "Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter + "'. ");
+        }
+
+        return quoteValue.charAt(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java
new file mode 100644
index 0000000..f07ba4c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.parser.HiveRecordParser;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HiveDataParserFactory implements IRecordDataParserFactory<Writable> {
+
+    private static final long serialVersionUID = 1L;
+    private Map<String, String> configuration;
+    private ARecordType recordType;
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void setRecordType(ARecordType recordType) {
+        this.recordType = recordType;
+    }
+
+    @Override
+    public IRecordDataParser<Writable> createRecordParser(IHyracksTaskContext ctx)
+            throws HyracksDataException, AsterixException {
+        HiveRecordParser hiveParser = new HiveRecordParser();
+        hiveParser.configure(configuration, recordType);
+        return hiveParser;
+    }
+
+    @Override
+    public Class<? extends Writable> getRecordClass() {
+        return Writable.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
new file mode 100644
index 0000000..fecb0de
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.parser.RSSParser;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+
+public class RSSParserFactory implements IRecordDataParserFactory<SyndEntryImpl> {
+
+    private static final long serialVersionUID = 1L;
+    private ARecordType recordType;
+    private Map<String, String> configuration;
+
+    @Override
+    public DataSourceType getDataSourceType() throws AsterixException {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void setRecordType(ARecordType recordType) {
+        this.recordType = recordType;
+    }
+
+    @Override
+    public IRecordDataParser<SyndEntryImpl> createRecordParser(IHyracksTaskContext ctx)
+            throws AsterixException, IOException {
+        RSSParser dataParser = new RSSParser();
+        dataParser.configure(configuration, recordType);
+        return dataParser;
+    }
+
+    @Override
+    public Class<? extends SyndEntryImpl> getRecordClass() {
+        return SyndEntryImpl.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
new file mode 100644
index 0000000..0f3b309
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.parser.TweetParser;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import twitter4j.Status;
+
+public class TweetParserFactory implements IRecordDataParserFactory<Status> {
+
+    private static final long serialVersionUID = 1L;
+    private ARecordType recordType;
+    private Map<String, String> configuration;
+
+    @Override
+    public DataSourceType getDataSourceType() throws AsterixException {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void setRecordType(ARecordType recordType) {
+        this.recordType = recordType;
+    }
+
+    @Override
+    public IRecordDataParser<Status> createRecordParser(IHyracksTaskContext ctx) throws AsterixException, IOException {
+        TweetParser dataParser = new TweetParser();
+        dataParser.configure(configuration, recordType);
+        return dataParser;
+    }
+
+    @Override
+    public Class<? extends Status> getRecordClass() {
+        return Status.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
new file mode 100644
index 0000000..649ca43
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
+import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IIndexingAdapterFactory;
+import org.apache.asterix.external.dataset.adapter.GenericAdapter;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.external.runtime.GenericSocketFeedAdapter;
+import org.apache.asterix.external.runtime.GenericSocketFeedAdapterFactory;
+import org.apache.asterix.external.runtime.SocketClientAdapter;
+import org.apache.asterix.external.runtime.SocketClientAdapterFactory;
+import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
+
+public class AdapterFactoryProvider {
+
+    public static final Map<String, Class<? extends IAdapterFactory>> adapterFactories = initializeAdapterFactoryMapping();
+
+    private static Map<String, Class<? extends IAdapterFactory>> initializeAdapterFactoryMapping() {
+        Map<String, Class<? extends IAdapterFactory>> adapterFactories = new HashMap<String, Class<? extends IAdapterFactory>>();
+        // Class names
+        adapterFactories.put(GenericAdapter.class.getName(), GenericAdapterFactory.class);
+        adapterFactories.put(GenericSocketFeedAdapter.class.getName(), GenericSocketFeedAdapterFactory.class);
+        adapterFactories.put(SocketClientAdapter.class.getName(), SocketClientAdapterFactory.class);
+
+        // Aliases
+        adapterFactories.put(ExternalDataConstants.ALIAS_GENERIC_ADAPTER, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_HDFS_ADAPTER, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_ADAPTER, GenericSocketFeedAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER, SocketClientAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ALIAS_FILE_FEED_ADAPTER, GenericAdapterFactory.class);
+
+        // Compatability
+        adapterFactories.put(ExternalDataConstants.ADAPTER_HDFS_CLASSNAME, GenericAdapterFactory.class);
+        adapterFactories.put(ExternalDataConstants.ADAPTER_LOCALFS_CLASSNAME, GenericAdapterFactory.class);
+        return adapterFactories;
+    }
+
+    public static IAdapterFactory getAdapterFactory(String adapterClassname, Map<String, String> configuration,
+            ARecordType itemType) throws Exception {
+        ExternalDataCompatibilityUtils.addCompatabilityParameters(adapterClassname, itemType, configuration);
+        if (!adapterFactories.containsKey(adapterClassname)) {
+            throw new AsterixException("Unknown adapter: " + adapterClassname);
+        }
+        IAdapterFactory adapterFactory = adapterFactories.get(adapterClassname).newInstance();
+        adapterFactory.configure(configuration, itemType);
+        return adapterFactory;
+    }
+
+    public static IIndexingAdapterFactory getAdapterFactory(String adapterClassname, Map<String, String> configuration,
+            ARecordType itemType, List<ExternalFile> snapshot, boolean indexingOp)
+                    throws AsterixException, InstantiationException, IllegalAccessException {
+        ExternalDataCompatibilityUtils.addCompatabilityParameters(adapterClassname, itemType, configuration);
+        if (!adapterFactories.containsKey(adapterClassname)) {
+            throw new AsterixException("Unknown adapter");
+        }
+        try {
+            IIndexingAdapterFactory adapterFactory = (IIndexingAdapterFactory) adapterFactories.get(adapterClassname)
+                    .newInstance();
+            adapterFactory.setSnapshot(snapshot, indexingOp);
+            adapterFactory.configure(configuration, itemType);
+            return adapterFactory;
+        } catch (Exception e) {
+            throw new AsterixException("Failed to create indexing adapter factory.", e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void addNewAdapter(String dataverseName, String adapterClassName, String adapterAlias,
+            String adapterFactoryClassName, String libraryName) throws ClassNotFoundException {
+        ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverseName, libraryName);
+        Class<? extends IAdapterFactory> adapterFactoryClass = (Class<? extends IAdapterFactory>) classLoader
+                .loadClass(adapterFactoryClassName);
+        adapterFactories.put(adapterClassName, adapterFactoryClass);
+        adapterFactories.put(adapterAlias, adapterFactoryClass);
+    }
+
+    public static LookupAdapterFactory<?> getAdapterFactory(Map<String, String> configuration, ARecordType recordType,
+            int[] ridFields, boolean retainInput, boolean retainNull, INullWriterFactory iNullWriterFactory)
+                    throws Exception {
+        LookupAdapterFactory<?> adapterFactory = new LookupAdapterFactory<>(recordType, ridFields, retainInput,
+                retainNull, iNullWriterFactory);
+        adapterFactory.configure(configuration);
+        return adapterFactory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
new file mode 100644
index 0000000..68a3942
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.api.IStreamDataParserFactory;
+import org.apache.asterix.external.dataflow.IndexingDataFlowController;
+import org.apache.asterix.external.dataflow.RecordDataFlowController;
+import org.apache.asterix.external.dataflow.StreamDataFlowController;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class DataflowControllerProvider {
+
+    /**
+     * Order of calls:
+     * 1. Constructor()
+     * 2. configure(configuration,ctx)
+     * 3. setTupleForwarder(forwarder)
+     * 4. if record flow controller
+     * |-a. Set record reader
+     * |-b. Set record parser
+     * else
+     * |-a. Set stream parser
+     * 5. start(writer)
+     */
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
+            int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
+            Map<String, String> configuration, boolean indexingOp) throws Exception {
+        switch (dataSourceFactory.getDataSourceType()) {
+            case RECORDS:
+                RecordDataFlowController recordDataFlowController;
+                if (indexingOp) {
+                    recordDataFlowController = new IndexingDataFlowController();
+                } else {
+                    recordDataFlowController = new RecordDataFlowController();
+                }
+                recordDataFlowController.configure(configuration, ctx);
+                recordDataFlowController.setTupleForwarder(DataflowUtils.getTupleForwarder(configuration));
+                IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory;
+                IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
+                IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
+                IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
+                dataParser.configure(configuration, recordType);
+                recordDataFlowController.setRecordReader(recordReader);
+                recordDataFlowController.setRecordParser(dataParser);
+                return recordDataFlowController;
+            case STREAM:
+                StreamDataFlowController streamDataFlowController = new StreamDataFlowController();
+                streamDataFlowController.configure(configuration, ctx);
+                streamDataFlowController.setTupleForwarder(DataflowUtils.getTupleForwarder(configuration));
+                IInputStreamProviderFactory streamProviderFactory = (IInputStreamProviderFactory) dataSourceFactory;
+                IInputStreamProvider streamProvider = streamProviderFactory.createInputStreamProvider(ctx, partition);
+                IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory;
+                streamParserFactory.configure(configuration);
+                IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
+                streamParser.configure(configuration, recordType);
+                streamParser.setInputStream(streamProvider.getInputStream());
+                streamDataFlowController.setStreamParser(streamParser);
+                return streamDataFlowController;
+            default:
+                throw new AsterixException("Unknown data source type: " + dataSourceFactory.getDataSourceType());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
new file mode 100644
index 0000000..c69e12c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.record.reader.factory.LineRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.factory.SemiStructuredRecordReaderFactory;
+import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
+import org.apache.asterix.external.input.stream.factory.SocketInputStreamProviderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+
+public class DatasourceFactoryProvider {
+
+    public static IExternalDataSourceFactory getExternalDataSourceFactory(Map<String, String> configuration)
+            throws Exception {
+        switch (ExternalDataUtils.getDataSourceType(configuration)) {
+            case RECORDS:
+                return DatasourceFactoryProvider.getRecordReaderFactory(configuration);
+            case STREAM:
+                return DatasourceFactoryProvider
+                        .getInputStreamFactory(configuration.get(ExternalDataConstants.KEY_STREAM), configuration);
+        }
+        return null;
+    }
+
+    public static IInputStreamProviderFactory getInputStreamFactory(String stream, Map<String, String> configuration)
+            throws Exception {
+        IInputStreamProviderFactory streamFactory;
+        if (ExternalDataUtils.isExternal(stream)) {
+            String dataverse = ExternalDataUtils.getDataverse(configuration);
+            streamFactory = ExternalDataUtils.createExternalInputStreamFactory(dataverse, stream);
+        } else {
+            switch (stream) {
+                case ExternalDataConstants.STREAM_HDFS:
+                    streamFactory = new HDFSDataSourceFactory();
+                    break;
+                case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
+                    streamFactory = new LocalFSInputStreamProviderFactory();
+                    break;
+                case ExternalDataConstants.STREAM_SOCKET:
+                    streamFactory = new SocketInputStreamProviderFactory();
+                    break;
+                default:
+                    throw new AsterixException("unknown input stream factory");
+            }
+        }
+        return streamFactory;
+    }
+
+    public static IRecordReaderFactory<?> getRecordReaderFactory(Map<String, String> configuration) throws Exception {
+        String reader = configuration.get(ExternalDataConstants.KEY_READER);
+        IRecordReaderFactory<?> readerFactory;
+        if (ExternalDataUtils.isExternal(reader)) {
+            String dataverse = ExternalDataUtils.getDataverse(configuration);
+            readerFactory = ExternalDataUtils.createExternalRecordReaderFactory(dataverse, reader);
+        } else {
+            switch (reader) {
+                case ExternalDataConstants.READER_HDFS:
+                    readerFactory = new HDFSDataSourceFactory();
+                    break;
+                case ExternalDataConstants.READER_ADM:
+                case ExternalDataConstants.READER_SEMISTRUCTURED:
+                    readerFactory = new SemiStructuredRecordReaderFactory()
+                            .setInputStreamFactoryProvider(DatasourceFactoryProvider.getInputStreamFactory(
+                                    ExternalDataUtils.getRecordReaderStreamName(configuration), configuration));
+                    break;
+                case ExternalDataConstants.READER_DELIMITED:
+                    readerFactory = new LineRecordReaderFactory()
+                            .setInputStreamFactoryProvider(DatasourceFactoryProvider.getInputStreamFactory(
+                                    ExternalDataUtils.getRecordReaderStreamName(configuration), configuration));;
+                    break;
+                default:
+                    throw new AsterixException("unknown input stream factory");
+            }
+        }
+        return readerFactory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ExternalIndexerProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ExternalIndexerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ExternalIndexerProvider.java
new file mode 100644
index 0000000..3c090a6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ExternalIndexerProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.indexing.FileOffsetIndexer;
+import org.apache.asterix.external.indexing.RecordColumnarIndexer;
+import org.apache.asterix.external.util.ExternalDataConstants;
+
+public class ExternalIndexerProvider {
+
+    public static IExternalIndexer getIndexer(Map<String, String> configuration) throws AsterixException {
+        String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+        if (inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.INPUT_FORMAT_TEXT)
+                || inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.CLASS_NAME_TEXT_INPUT_FORMAT)
+                || inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.INPUT_FORMAT_SEQUENCE)
+                || inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.CLASS_NAME_SEQUENCE_INPUT_FORMAT)) {
+            return new FileOffsetIndexer();
+        } else if (inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.INPUT_FORMAT_RC)
+                || inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.CLASS_NAME_RC_INPUT_FORMAT)) {
+            return new RecordColumnarIndexer();
+        } else {
+            throw new AsterixException("Unable to create indexer for data with format: " + inputFormatParameter);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
new file mode 100644
index 0000000..f5a0512
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
+import org.apache.asterix.external.parser.factory.DelimitedDataParserFactory;
+import org.apache.asterix.external.parser.factory.HiveDataParserFactory;
+import org.apache.asterix.external.parser.factory.RSSParserFactory;
+import org.apache.asterix.external.parser.factory.TweetParserFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+
+public class ParserFactoryProvider {
+    public static IDataParserFactory getDataParserFactory(Map<String, String> configuration)
+            throws InstantiationException, IllegalAccessException, ClassNotFoundException, AsterixException {
+        IDataParserFactory parserFactory = null;
+        String parserFactoryName = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
+        if (parserFactoryName != null && ExternalDataUtils.isExternal(parserFactoryName)) {
+            return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration),
+                    parserFactoryName);
+        } else {
+            parserFactory = ParserFactoryProvider.getParserFactory(configuration);
+        }
+        return parserFactory;
+    }
+
+    private static IDataParserFactory getParserFactory(Map<String, String> configuration) throws AsterixException {
+        String recordFormat = ExternalDataUtils.getRecordFormat(configuration);
+        switch (recordFormat) {
+            case ExternalDataConstants.FORMAT_ADM:
+            case ExternalDataConstants.FORMAT_JSON:
+                return new ADMDataParserFactory();
+            case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
+                return new DelimitedDataParserFactory();
+            case ExternalDataConstants.FORMAT_HIVE:
+                return new HiveDataParserFactory();
+            case ExternalDataConstants.FORMAT_TWEET:
+                return new TweetParserFactory();
+            case ExternalDataConstants.FORMAT_RSS:
+                return new RSSParserFactory();
+            default:
+                throw new AsterixException("Unknown data format");
+        }
+    }
+}