You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/02/22 23:34:55 UTC

[07/34] incubator-asterixdb git commit: Enabled Feed Tests and Added External Library tests

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
new file mode 100644
index 0000000..2882083
--- /dev/null
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
@@ -0,0 +1,1783 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.builders.AbvsBuilderFactory;
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.builders.ListBuilderFactory;
+import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.builders.RecordBuilderFactory;
+import org.apache.asterix.builders.UnorderedListBuilder;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.classad.AMutableCharArrayString;
+import org.apache.asterix.external.classad.AMutableNumberFactor;
+import org.apache.asterix.external.classad.AttributeReference;
+import org.apache.asterix.external.classad.CaseInsensitiveString;
+import org.apache.asterix.external.classad.CharArrayLexerSource;
+import org.apache.asterix.external.classad.ClassAd;
+import org.apache.asterix.external.classad.ExprList;
+import org.apache.asterix.external.classad.ExprTree;
+import org.apache.asterix.external.classad.ExprTree.NodeKind;
+import org.apache.asterix.external.classad.ExprTreeHolder;
+import org.apache.asterix.external.classad.FileLexerSource;
+import org.apache.asterix.external.classad.FunctionCall;
+import org.apache.asterix.external.classad.InputStreamLexerSource;
+import org.apache.asterix.external.classad.Lexer;
+import org.apache.asterix.external.classad.Lexer.TokenType;
+import org.apache.asterix.external.classad.LexerSource;
+import org.apache.asterix.external.classad.Literal;
+import org.apache.asterix.external.classad.Operation;
+import org.apache.asterix.external.classad.StringLexerSource;
+import org.apache.asterix.external.classad.TokenValue;
+import org.apache.asterix.external.classad.Value;
+import org.apache.asterix.external.classad.Value.NumberFactor;
+import org.apache.asterix.external.classad.object.pool.AttributeReferencePool;
+import org.apache.asterix.external.classad.object.pool.BitSetPool;
+import org.apache.asterix.external.classad.object.pool.ClassAdPool;
+import org.apache.asterix.external.classad.object.pool.ExprHolderPool;
+import org.apache.asterix.external.classad.object.pool.ExprListPool;
+import org.apache.asterix.external.classad.object.pool.LiteralPool;
+import org.apache.asterix.external.classad.object.pool.OperationPool;
+import org.apache.asterix.external.classad.object.pool.TokenValuePool;
+import org.apache.asterix.external.classad.object.pool.ValuePool;
+import org.apache.asterix.external.parser.AbstractDataParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+/// This reads ClassAd strings from various sources and converts them into a ClassAd.
+/// It can read from Strings, Files, and InputStreams.
+public class ClassAdParser extends AbstractDataParser implements IRecordDataParser<char[]> {
+
+    // reusable components
+    private Lexer lexer = new Lexer();
+    private LexerSource currentSource = null;
+    private boolean isExpr = false;
+    // object pools
+    private final ExprHolderPool mutableExprPool = new ExprHolderPool();
+    private final TokenValuePool tokenValuePool = new TokenValuePool();
+    private final ClassAdPool classAdPool = new ClassAdPool();
+    private final ExprListPool exprListPool = new ExprListPool();
+    private final ValuePool valuePool = new ValuePool();
+    private final LiteralPool literalPool = new LiteralPool();
+    private final BitSetPool bitSetPool = new BitSetPool();
+    private final OperationPool operationPool = new OperationPool();
+    private final AttributeReferencePool attrRefPool = new AttributeReferencePool();
+    // asterix objects
+    private ARecordType recordType;
+    private IObjectPool<IARecordBuilder, ATypeTag> recordBuilderPool = new ListObjectPool<IARecordBuilder, ATypeTag>(
+            new RecordBuilderFactory());
+    private IObjectPool<IAsterixListBuilder, ATypeTag> listBuilderPool = new ListObjectPool<IAsterixListBuilder, ATypeTag>(
+            new ListBuilderFactory());
+    private IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool = new ListObjectPool<IMutableValueStorage, ATypeTag>(
+            new AbvsBuilderFactory());
+    private ClassAd rootAd = new ClassAd(false, true);
+    private String exprPrefix = "expr=";
+    private String exprSuffix = "";
+    private boolean evaluateExpr = true;
+    private String exprFieldNameSuffix = "Expr";
+    private boolean keepBoth = true;
+    private boolean oldFormat = false;
+    private StringLexerSource stringLexerSource = new StringLexerSource("");
+
+    public ClassAdParser(ARecordType recordType) {
+        this.recordType = recordType;
+        this.currentSource = new CharArrayLexerSource();
+    }
+
+    public ClassAdParser() {
+        this.recordType = null;
+        this.currentSource = new CharArrayLexerSource();
+    }
+
+    /***********************************
+     * AsterixDB Specific begin
+     *
+     * @throws AsterixException
+     ***********************************/
+    public void asterixParse(ClassAd classad, DataOutput out) throws IOException, AsterixException {
+        // we assume the lexer source used here is a char array
+        parseClassAd(currentSource, classad, false);
+        parseRecord(null, classad, out);
+    }
+
+    public void handleErrorParsing() throws IOException {
+    }
+
+    private boolean asterixParseClassAd(ClassAd ad) throws IOException {
+        TokenType tt;
+        ad.clear();
+        lexer.initialize(currentSource);
+        if ((tt = lexer.consumeToken()) != TokenType.LEX_OPEN_BOX) {
+            handleErrorParsing();
+            return false;
+        }
+        tt = lexer.peekToken();
+        TokenValue tv = tokenValuePool.get();
+        ExprTreeHolder tree = mutableExprPool.get();
+        while (tt != TokenType.LEX_CLOSE_BOX) {
+            // Get the name of the expression
+            tv.reset();
+            tree.reset();
+            tt = lexer.consumeToken(tv);
+            if (tt == TokenType.LEX_SEMICOLON) {
+                // We allow empty expressions, so if someone give a double
+                // semicolon, it doesn't
+                // hurt. Technically it's not right, but we shouldn't make users
+                // pay the price for
+                // a meaningless mistake. See condor-support #1881 for a user
+                // that was bitten by this.
+                continue;
+            }
+            if (tt != TokenType.LEX_IDENTIFIER) {
+                throw new HyracksDataException(
+                        "while parsing classad:  expected LEX_IDENTIFIER " + " but got " + Lexer.strLexToken(tt));
+            }
+
+            // consume the intermediate '='
+            if ((tt = lexer.consumeToken()) != TokenType.LEX_BOUND_TO) {
+                throw new HyracksDataException(
+                        "while parsing classad:  expected LEX_BOUND_TO " + " but got " + Lexer.strLexToken(tt));
+            }
+
+            int positionBefore = lexer.getLexSource().getPosition();
+            isExpr = false;
+            // parse the expression
+            parseExpression(tree);
+            if (tree.getInnerTree() == null) {
+                handleErrorParsing();
+                throw new HyracksDataException("parse expression returned empty tree");
+            }
+
+            if ((!evaluateExpr || keepBoth) && isExpr && positionBefore >= 0) {
+                // we will store a string representation of the expression
+                int len = lexer.getLexSource().getPosition() - positionBefore;
+                // add it as it is to the classAd
+                Literal lit = literalPool.get();
+                Value exprVal = valuePool.get();
+                exprVal.setStringValue(exprPrefix
+                        + String.valueOf(lexer.getLexSource().getBuffer(), positionBefore, len) + exprSuffix);
+                Literal.createLiteral(lit, exprVal, NumberFactor.NO_FACTOR);
+                if (!evaluateExpr) {
+                    ad.insert(tv.getStrValue().toString(), lit);
+                } else {
+                    ad.insert(tv.getStrValue().toString() + exprFieldNameSuffix, lit);
+                }
+            }
+            if (!isExpr || (evaluateExpr)) {
+                // insert the attribute into the classad
+                if (!ad.insert(tv.getStrValue().toString(), tree)) {
+                    handleErrorParsing();
+                    throw new HyracksDataException("Couldn't insert value to classad");
+                }
+            }
+            // the next token must be a ';' or a ']'
+            tt = lexer.peekToken();
+            if (tt != TokenType.LEX_SEMICOLON && tt != TokenType.LEX_CLOSE_BOX) {
+                handleErrorParsing();
+                throw new HyracksDataException("while parsing classad:  expected LEX_SEMICOLON or "
+                        + "LEX_CLOSE_BOX but got " + Lexer.strLexToken(tt));
+            }
+
+            // Slurp up any extra semicolons. This does not duplicate the work
+            // at the top of the loop
+            // because it accounts for the case where the last expression has
+            // extra semicolons,
+            // while the first case accounts for optional beginning semicolons.
+            while (tt == TokenType.LEX_SEMICOLON) {
+                lexer.consumeToken();
+                tt = lexer.peekToken();
+            }
+        }
+        return true;
+    }
+
+    public static String readLine(char[] buffer, AMutableInt32 offset, int maxOffset) {
+        int position = offset.getIntegerValue();
+        while (buffer[position] != '\n' && position < maxOffset) {
+            position++;
+        }
+        if (offset.getIntegerValue() == position) {
+            return null;
+        }
+        String line = String.valueOf(buffer, offset.getIntegerValue(), position - offset.getIntegerValue());
+        position++;
+        offset.setValue(position);
+        return line;
+    }
+
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+
+    /**
+     * Resets the pools before parsing a top-level record. In this way the
+     * elements in those pools can be re-used.
+     */
+    private void resetPools() {
+        listBuilderPool.reset();
+        recordBuilderPool.reset();
+        abvsBuilderPool.reset();
+        mutableExprPool.reset();
+        tokenValuePool.reset();
+        classAdPool.reset();
+        exprListPool.reset();
+        valuePool.reset();
+        literalPool.reset();
+        bitSetPool.reset();
+        operationPool.reset();
+        attrRefPool.reset();
+    }
+
+    private ATypeTag getTargetTypeTag(ATypeTag expectedTypeTag, IAType aObjectType) throws IOException {
+        if (aObjectType == null) {
+            return expectedTypeTag;
+        }
+        if (aObjectType.getTypeTag() != ATypeTag.UNION) {
+            final ATypeTag typeTag = aObjectType.getTypeTag();
+            if (ATypeHierarchy.canPromote(expectedTypeTag, typeTag)
+                    || ATypeHierarchy.canDemote(expectedTypeTag, typeTag)) {
+                return typeTag;
+            } else {
+                return null;
+            }
+        } else { // union
+            List<IAType> unionList = ((AUnionType) aObjectType).getUnionList();
+            for (IAType t : unionList) {
+                final ATypeTag typeTag = t.getTypeTag();
+                if (ATypeHierarchy.canPromote(expectedTypeTag, typeTag)
+                        || ATypeHierarchy.canDemote(expectedTypeTag, typeTag)) {
+                    return typeTag;
+                }
+            }
+        }
+        return null;
+    }
+
+    private void parseRecord(ARecordType recType, ClassAd pAd, DataOutput out) throws IOException, AsterixException {
+        ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
+        ArrayBackedValueStorage fieldNameBuffer = getTempBuffer();
+        IARecordBuilder recBuilder = getRecordBuilder();
+        BitSet nulls = null;
+        if (recType != null) {
+            nulls = getBitSet();
+            recBuilder.reset(recType);
+        } else {
+            recBuilder.reset(null);
+        }
+        recBuilder.init();
+        Boolean openRecordField = false;
+        int fieldId = 0;
+        IAType fieldType = null;
+
+        // new stuff
+        Map<CaseInsensitiveString, ExprTree> attrs = pAd.getAttrList();
+        for (Entry<CaseInsensitiveString, ExprTree> entry : attrs.entrySet()) {
+            // reset buffers
+            fieldNameBuffer.reset();
+            fieldValueBuffer.reset();
+            // take care of field name
+            String fldName = entry.getKey().get();
+            if (recType != null) {
+                fieldId = recBuilder.getFieldId(fldName);
+                if (fieldId < 0 && !recType.isOpen()) {
+                    throw new HyracksDataException("This record is closed, you can not add extra fields !!");
+                } else if (fieldId < 0 && recType.isOpen()) {
+                    aStringFieldName.setValue(fldName);
+                    if (aStringFieldName.getStringValue().contains("org.apache.asterix.external.classad.TokenValue")) {
+                        System.err.println("we have a problem");
+                    }
+                    stringSerde.serialize(aStringFieldName, fieldNameBuffer.getDataOutput());
+                    openRecordField = true;
+                    fieldType = null;
+                } else {
+                    // a closed field
+                    nulls.set(fieldId);
+                    fieldType = recType.getFieldTypes()[fieldId];
+                    openRecordField = false;
+                }
+            } else {
+                aStringFieldName.setValue(fldName);
+                stringSerde.serialize(aStringFieldName, fieldNameBuffer.getDataOutput());
+                openRecordField = true;
+                fieldType = null;
+            }
+
+            // add field value to value buffer
+            writeFieldValueToBuffer(fieldType, fieldValueBuffer.getDataOutput(), fldName, entry.getValue(), pAd);
+            if (openRecordField) {
+                if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
+                    recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
+                }
+            } else if (NonTaggedFormatUtil.isOptional(fieldType)) {
+                if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
+                    recBuilder.addField(fieldId, fieldValueBuffer);
+                }
+            } else {
+                recBuilder.addField(fieldId, fieldValueBuffer);
+            }
+        }
+
+        if (recType != null) {
+            int nullableFieldId = checkNullConstraints(recType, nulls);
+            if (nullableFieldId != -1) {
+                throw new HyracksDataException(
+                        "Field: " + recType.getFieldNames()[nullableFieldId] + " can not be null");
+            }
+        }
+        recBuilder.write(out, true);
+    }
+
+    // The only method left
+    private void writeFieldValueToBuffer(IAType fieldType, DataOutput out, String name, ExprTree tree, ClassAd pAd)
+            throws IOException, AsterixException {
+        Value val;
+        switch (tree.getKind()) {
+            case ATTRREF_NODE:
+            case CLASSAD_NODE:
+            case EXPR_ENVELOPE:
+            case EXPR_LIST_NODE:
+            case FN_CALL_NODE:
+            case OP_NODE:
+                val = valuePool.get();
+                if (pAd.evaluateAttr(name, val)) {
+
+                } else {
+                    // just write the expr
+                    val = ((Literal) pAd.getAttrList().get(name + "Expr")).getValue();
+                }
+                break;
+            case LITERAL_NODE:
+                val = ((Literal) tree).getValue();
+                break;
+            default:
+                throw new HyracksDataException("Unknown Expression type detected: " + tree.getKind());
+        }
+
+        switch (val.getValueType()) {
+            case ABSOLUTE_TIME_VALUE:
+                if (checkType(ATypeTag.DATETIME, fieldType)) {
+                    parseDateTime(val, out);
+                } else {
+                    throw new HyracksDataException(mismatchErrorMessage + fieldType.getTypeTag());
+                }
+                break;
+            case BOOLEAN_VALUE:
+                if (checkType(ATypeTag.BOOLEAN, fieldType)) {
+                    booleanSerde.serialize(val.getBoolVal() ? ABoolean.TRUE : ABoolean.FALSE, out);
+                } else {
+                    throw new HyracksDataException(mismatchErrorMessage + fieldType.getTypeTag());
+                }
+                break;
+            case CLASSAD_VALUE:
+                if (checkType(ATypeTag.RECORD, fieldType)) {
+                    IAType objectType = getComplexType(fieldType, ATypeTag.RECORD);
+                    ClassAd classad = val.getClassadVal();
+                    parseRecord((ARecordType) objectType, classad, out);
+                } else {
+                    throw new HyracksDataException(mismatchErrorMessage + fieldType.getTypeTag());
+                }
+                break;
+            case ERROR_VALUE:
+            case STRING_VALUE:
+            case UNDEFINED_VALUE:
+                if (checkType(ATypeTag.STRING, fieldType)) {
+                    parseString(val, out);
+                } else {
+                    throw new HyracksDataException(mismatchErrorMessage + fieldType.getTypeTag());
+                }
+                break;
+            case INTEGER_VALUE:
+                if (checkType(ATypeTag.INT64, fieldType)) {
+                    aInt64.setValue(val.getLongVal());
+                    int64Serde.serialize(aInt64, out);
+                } else if (checkType(ATypeTag.DOUBLE, fieldType)) {
+                    aDouble.setValue(val.getLongVal());
+                    doubleSerde.serialize(aDouble, out);
+                } else {
+                    throw new HyracksDataException(mismatchErrorMessage + fieldType.getTypeTag());
+                }
+                break;
+            case LIST_VALUE:
+            case SLIST_VALUE:
+                IAType objectType;
+                if (checkType(ATypeTag.UNORDEREDLIST, fieldType)) {
+                    objectType = getComplexType(fieldType, ATypeTag.UNORDEREDLIST);
+                    parseUnorderedList((AUnorderedListType) objectType, val, out);
+                } else if (checkType(ATypeTag.ORDEREDLIST, fieldType)) {
+                    objectType = getComplexType(fieldType, ATypeTag.ORDEREDLIST);
+                    parseOrderedList((AOrderedListType) objectType, val, out);
+                } else {
+                    throw new HyracksDataException(mismatchErrorMessage + fieldType.getTypeTag());
+                }
+                break;
+            case REAL_VALUE:
+                if (checkType(ATypeTag.DOUBLE, fieldType)) {
+                    aDouble.setValue(val.getDoubleVal());
+                    doubleSerde.serialize(aDouble, out);
+                } else if (checkType(ATypeTag.INT32, fieldType)) {
+                    aInt32.setValue((int) val.getDoubleVal());
+                    int32Serde.serialize(aInt32, out);
+                } else if (checkType(ATypeTag.INT64, fieldType)) {
+                    aInt64.setValue((long) val.getDoubleVal());
+                    int64Serde.serialize(aInt64, out);
+                } else {
+                    throw new HyracksDataException(mismatchErrorMessage + fieldType.getTypeTag());
+                }
+                break;
+            case RELATIVE_TIME_VALUE:
+                if (checkType(ATypeTag.DURATION, fieldType)) {
+                    parseDuration(val, out);
+                } else {
+                    throw new HyracksDataException(mismatchErrorMessage + fieldType.getTypeTag());
+                }
+                break;
+            default:
+                throw new HyracksDataException("unknown data type " + val.getValueType());
+        }
+    }
+
+    private void parseOrderedList(AOrderedListType oltype, Value listVal, DataOutput out)
+            throws IOException, AsterixException {
+        ArrayBackedValueStorage itemBuffer = getTempBuffer();
+        OrderedListBuilder orderedListBuilder = (OrderedListBuilder) getOrderedListBuilder();
+        IAType itemType = null;
+        if (oltype != null) {
+            itemType = oltype.getItemType();
+        }
+        orderedListBuilder.reset(oltype);
+        for (ExprTree tree : listVal.getListVal().getExprList()) {
+            itemBuffer.reset();
+            writeFieldValueToBuffer(itemType, itemBuffer.getDataOutput(), null, tree, null);
+            orderedListBuilder.addItem(itemBuffer);
+        }
+        orderedListBuilder.write(out, true);
+    }
+
+    private void parseUnorderedList(AUnorderedListType uoltype, Value listVal, DataOutput out)
+            throws IOException, AsterixException {
+        ArrayBackedValueStorage itemBuffer = getTempBuffer();
+        UnorderedListBuilder unorderedListBuilder = (UnorderedListBuilder) getUnorderedListBuilder();
+        IAType itemType = null;
+        if (uoltype != null) {
+            itemType = uoltype.getItemType();
+        }
+        unorderedListBuilder.reset(uoltype);
+        for (ExprTree tree : listVal.getListVal().getExprList()) {
+            itemBuffer.reset();
+            writeFieldValueToBuffer(itemType, itemBuffer.getDataOutput(), null, tree, null);
+            unorderedListBuilder.addItem(itemBuffer);
+        }
+        unorderedListBuilder.write(out, true);
+    }
+
+    private void parseString(Value val, DataOutput out) throws HyracksDataException {
+        switch (val.getValueType()) {
+            case ERROR_VALUE:
+                aString.setValue("error");
+                break;
+            case STRING_VALUE:
+                aString.setValue(val.getStringVal());
+                break;
+            case UNDEFINED_VALUE:
+                aString.setValue("undefined");
+                break;
+            default:
+                throw new HyracksDataException("Unknown String type " + val.getValueType());
+        }
+        stringSerde.serialize(aString, out);
+    }
+
+    protected void parseDuration(Value duration, DataOutput out) throws HyracksDataException {
+        try {
+            aDuration.setValue(0, duration.getTimeVal().getRelativeTime());
+            durationSerde.serialize(aDuration, out);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    protected void parseDateTime(Value datetime, DataOutput out) throws HyracksDataException {
+        aDateTime.setValue(datetime.getTimeVal().getTimeInMillis());
+        datetimeSerde.serialize(aDateTime, out);
+    }
+
+    public static IAType getComplexType(IAType aObjectType, ATypeTag tag) {
+        if (aObjectType == null) {
+            return null;
+        }
+
+        if (aObjectType.getTypeTag() == tag) {
+            return aObjectType;
+        }
+
+        if (aObjectType.getTypeTag() == ATypeTag.UNION) {
+            List<IAType> unionList = ((AUnionType) aObjectType).getUnionList();
+            for (int i = 0; i < unionList.size(); i++) {
+                if (unionList.get(i).getTypeTag() == tag) {
+                    return unionList.get(i);
+                }
+            }
+        }
+        return null; // wont get here
+    }
+
+    private String mismatchErrorMessage = "Mismatch Type, expecting a value of type ";
+    private Map<String, String> configuration;
+
+    private boolean checkType(ATypeTag expectedTypeTag, IAType aObjectType) throws IOException {
+        return getTargetTypeTag(expectedTypeTag, aObjectType) != null;
+    }
+
+    private BitSet getBitSet() {
+        return bitSetPool.get();
+    }
+
+    public static int checkNullConstraints(ARecordType recType, BitSet nulls) {
+        boolean isNull = false;
+        for (int i = 0; i < recType.getFieldTypes().length; i++) {
+            if (nulls.get(i) == false) {
+                IAType type = recType.getFieldTypes()[i];
+                if (type.getTypeTag() != ATypeTag.NULL && type.getTypeTag() != ATypeTag.UNION) {
+                    return i;
+                }
+
+                if (type.getTypeTag() == ATypeTag.UNION) { // union
+                    List<IAType> unionList = ((AUnionType) type).getUnionList();
+                    for (int j = 0; j < unionList.size(); j++) {
+                        if (unionList.get(j).getTypeTag() == ATypeTag.NULL) {
+                            isNull = true;
+                            break;
+                        }
+                    }
+                    if (!isNull) {
+                        return i;
+                    }
+                }
+            }
+        }
+        return -1;
+    }
+
+    private IARecordBuilder getRecordBuilder() {
+        return recordBuilderPool.allocate(ATypeTag.RECORD);
+    }
+
+    private IAsterixListBuilder getOrderedListBuilder() {
+        return listBuilderPool.allocate(ATypeTag.ORDEREDLIST);
+    }
+
+    private IAsterixListBuilder getUnorderedListBuilder() {
+        return listBuilderPool.allocate(ATypeTag.UNORDEREDLIST);
+    }
+
+    private ArrayBackedValueStorage getTempBuffer() {
+        return (ArrayBackedValueStorage) abvsBuilderPool.allocate(ATypeTag.BINARY);
+    }
+
+    public static ATypeTag getMatchingType(Literal lit) throws HyracksDataException {
+        return getMatchingType(lit.getValue());
+    }
+
+    public static ATypeTag getMatchingType(Value val) throws HyracksDataException {
+        switch (val.getValueType()) {
+            case ABSOLUTE_TIME_VALUE:
+                return ATypeTag.DATETIME;
+            case BOOLEAN_VALUE:
+                return ATypeTag.BOOLEAN;
+            case CLASSAD_VALUE:
+                return ATypeTag.RECORD;
+            case ERROR_VALUE:
+            case STRING_VALUE:
+            case UNDEFINED_VALUE:
+                return ATypeTag.STRING;
+            case INTEGER_VALUE:
+                return ATypeTag.INT64;
+            case LIST_VALUE:
+            case SLIST_VALUE:
+                return ATypeTag.UNORDEREDLIST;
+            case NULL_VALUE:
+                return ATypeTag.NULL;
+            case REAL_VALUE:
+                return ATypeTag.DOUBLE;
+            case RELATIVE_TIME_VALUE:
+                return ATypeTag.DURATION;
+            default:
+                throw new HyracksDataException("Unknown data type");
+        }
+    }
+
+    /********************************
+     * End of AsterixDB specifics
+     ********************************/
+
+    /**
+     * Parse a ClassAd
+     *
+     * @param buffer
+     *            Buffer containing the string representation of the classad.
+     * @param full
+     *            If this parameter is true, the parse is considered to succeed
+     *            only if the ClassAd was parsed successfully and no other
+     *            tokens follow the ClassAd.
+     * @return pointer to the ClassAd object if successful, or null otherwise
+     * @throws IOException
+     */
+    public ClassAd parseClassAd(String buffer, boolean full) throws IOException {
+        currentSource = new StringLexerSource(buffer);
+        return parseClassAd(currentSource, full);
+    }
+
+    public ClassAd parseClassAd(String buffer, AMutableInt32 offset) throws IOException {
+        currentSource = new StringLexerSource(buffer);
+        ClassAd ad = parseClassAd((StringLexerSource) currentSource);
+        offset.setValue(((StringLexerSource) currentSource).getCurrentLocation());
+        return ad;
+    }
+
+    public ClassAd parseClassAd(StringLexerSource lexer_source) throws IOException {
+        return parseClassAd(lexer_source, false);
+    }
+
+    public ClassAd parseClassAd(File file, boolean full) throws IOException {
+        FileLexerSource fileLexerSource = new FileLexerSource(file);
+        return parseClassAd(fileLexerSource, full);
+    }
+
+    public ClassAd parseClassAd(InputStream in, boolean full) throws IOException {
+        InputStreamLexerSource lexer_source = new InputStreamLexerSource(in);
+        return parseClassAd(lexer_source, full);
+    }
+
+    // preferred method since the parser doesn't need to create an object
+    public void parseClassAd(ClassAd ad, LexerSource lexer_source, boolean full) throws IOException {
+        ad.reset();
+        if (lexer.initialize(lexer_source)) {
+            if (!parseClassAd(ad, full)) {
+                return;
+            } else if (lexer_source.readPreviousCharacter() != '\0') {
+                // The lexer swallows one extra character, so if we have
+                // two classads back to back we need to make sure to unread
+                // one of the characters.
+                lexer_source.unreadCharacter();
+            }
+        }
+    }
+
+    public ClassAd parseClassAd(LexerSource lexer_source, boolean full) throws IOException {
+        System.out.println("Don't use this call. instead, pass a mutable classad instance");
+        ClassAd ad = classAdPool.get();
+        if (lexer.initialize(lexer_source)) {
+            if (!parseClassAd(ad, full)) {
+                return null;
+            } else if (lexer_source.readPreviousCharacter() != '\0') {
+                // The lexer swallows one extra character, so if we have
+                // two classads back to back we need to make sure to unread
+                // one of the characters.
+                lexer_source.unreadCharacter();
+            }
+        }
+        return ad;
+    }
+
+    /**
+     * Parse a ClassAd
+     *
+     * @param buffer
+     *            Buffer containing the string representation of the classad.
+     * @param ad
+     *            The classad to be populated
+     * @param full
+     *            If this parameter is true, the parse is considered to succeed
+     *            only if the ClassAd was parsed successfully and no other
+     *            tokens follow the ClassAd.
+     * @return true on success, false on failure
+     * @throws IOException
+     */
+    public boolean parseClassAd(String buffer, ClassAd classad, boolean full) throws IOException {
+        StringLexerSource stringLexerSource = new StringLexerSource(buffer);
+        return parseClassAd(stringLexerSource, classad, full);
+    }
+
+    public boolean parseClassAd(String buffer, ClassAd classad, AMutableInt32 offset) throws IOException {
+        boolean success = false;
+        StringLexerSource stringLexerSource = new StringLexerSource(buffer, offset.getIntegerValue().intValue());
+        success = parseClassAd(stringLexerSource, classad);
+        offset.setValue(stringLexerSource.getCurrentLocation());
+        return success;
+    }
+
+    public boolean parseNext(ClassAd classad) throws IOException {
+        return parseClassAd(currentSource, classad, false);
+    }
+
+    public boolean parseNext(ClassAd classad, boolean full) throws IOException {
+        return parseClassAd(currentSource, classad, full);
+    }
+
+    private boolean parseClassAd(StringLexerSource lexer_source, ClassAd classad) throws IOException {
+        return parseClassAd(lexer_source, classad, false);
+    }
+
+    public boolean parseClassAd(File file, ClassAd classad, boolean full) throws IOException {
+        FileLexerSource fileLexerSource = new FileLexerSource(file);
+        return parseClassAd(fileLexerSource, classad, full);
+    }
+
+    public boolean parseClassAd(InputStream stream, ClassAd classad, boolean full) throws IOException {
+        InputStreamLexerSource inputStreamLexerSource = new InputStreamLexerSource(stream);
+        return parseClassAd(inputStreamLexerSource, classad, full);
+    }
+
+    public boolean parseClassAd(LexerSource lexer_source, ClassAd classad, boolean full) throws IOException {
+        boolean success = false;
+        if (lexer.initialize(lexer_source)) {
+            success = parseClassAd(classad, full);
+        }
+        if (success) {
+            // The lexer swallows one extra character, so if we have
+            // two classads back to back we need to make sure to unread
+            // one of the characters.
+            if (lexer_source.readPreviousCharacter() != Lexer.EOF) {
+                lexer_source.unreadCharacter();
+            }
+        } else {
+            classad.clear();
+        }
+        return success;
+    }
+
+    /**
+     * Parse an expression
+     *
+     * @param buffer
+     *            Buffer containing the string representation of the expression.
+     * @param full
+     *            If this parameter is true, the parse is considered to succeed
+     *            only if the expression was parsed successfully and no other
+     *            tokens are left.
+     * @return pointer to the expression object if successful, or null otherwise
+     */
+    public ExprTree parseExpression(String buffer, boolean full) throws IOException {
+        stringLexerSource.setNewSource(buffer);
+        ExprTreeHolder mutableExpr = mutableExprPool.get();
+        if (lexer.initialize(stringLexerSource)) {
+            parseExpression(mutableExpr, full);
+        }
+        return mutableExpr.getInnerTree();
+    }
+
+    public ExprTree ParseExpression(String buffer) throws IOException {
+        return parseExpression(buffer, false);
+    }
+
+    public ExprTree parseExpression(LexerSource lexer_source, boolean full) throws IOException {
+        ExprTreeHolder mutableExpr = mutableExprPool.get();
+        if (lexer.initialize(lexer_source)) {
+            parseExpression(mutableExpr, full);
+        }
+        return mutableExpr.getInnerTree();
+    }
+
+    public ExprTree parseNextExpression() throws IOException {
+        if (!lexer.wasInitialized()) {
+            return null;
+        } else {
+            ExprTreeHolder expr = mutableExprPool.get();
+            parseExpression(expr, false);
+            ExprTree innerTree = expr.getInnerTree();
+            return innerTree;
+        }
+    }
+
+    /*--------------------------------------------------------------------
+    *
+    * Private Functions
+    *
+    *-------------------------------------------------------------------*/
+
+    // Expression .= LogicalORExpression
+    // | LogicalORExpression '?' Expression ':' Expression
+
+    private boolean parseExpression(ExprTreeHolder tree) throws IOException {
+        return parseExpression(tree, false);
+    }
+
+    private boolean parseExpression(ExprTreeHolder tree, boolean full) throws IOException {
+        TokenType tt;
+        if (!parseLogicalORExpression(tree))
+            return false;
+        if ((tt = lexer.peekToken()) == TokenType.LEX_QMARK) {
+            lexer.consumeToken();
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeM = mutableExprPool.get();
+            ExprTreeHolder treeR = mutableExprPool.get();
+            parseExpression(treeM);
+            if ((tt = lexer.consumeToken()) != TokenType.LEX_COLON) {
+                throw new HyracksDataException("expected LEX_COLON, but got " + Lexer.strLexToken(tt));
+            }
+            parseExpression(treeR);
+            if (treeL.getInnerTree() != null && treeM.getInnerTree() != null && treeR.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(Operation.OpKind_TERNARY_OP, treeL, treeM, treeR, newTree);
+                tree.setInnerTree(newTree);
+                return (true);
+            }
+            tree.setInnerTree(null);
+            return false;
+        }
+        // if a full parse was requested, ensure that input is exhausted
+        if (full && (lexer.consumeToken() != TokenType.LEX_END_OF_INPUT)) {
+            throw new HyracksDataException(
+                    "expected LEX_END_OF_INPUT on full parse, but got " + String.valueOf(Lexer.strLexToken(tt)));
+        }
+        return true;
+    }
+
+    // LogicalORExpression .= LogicalANDExpression
+    // | LogicalORExpression '||' LogicalANDExpression
+
+    private boolean parseLogicalORExpression(ExprTreeHolder tree) throws IOException {
+        if (!parseLogicalANDExpression(tree))
+            return false;
+        while ((lexer.peekToken()) == TokenType.LEX_LOGICAL_OR) {
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeR = mutableExprPool.get();
+            lexer.consumeToken();
+            parseLogicalANDExpression(treeR);
+            if (treeL.getInnerTree() != null && treeR.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(Operation.OpKind_LOGICAL_OR_OP, treeL, treeR, null, newTree);
+                tree.setInnerTree(newTree);
+            } else {
+                tree.setInnerTree(null);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // LogicalANDExpression .= InclusiveORExpression
+    // | LogicalANDExpression '&&' InclusiveORExpression
+    private boolean parseLogicalANDExpression(ExprTreeHolder tree) throws IOException {
+        if (!parseInclusiveORExpression(tree))
+            return false;
+        while ((lexer.peekToken()) == TokenType.LEX_LOGICAL_AND) {
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeR = mutableExprPool.get();
+            lexer.consumeToken();
+            parseInclusiveORExpression(treeR);
+            if (treeL.getInnerTree() != null && treeR.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(Operation.OpKind_LOGICAL_AND_OP, treeL, treeR, null, newTree);
+                tree.setInnerTree(newTree);
+            } else {
+                tree.setInnerTree(null);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // InclusiveORExpression .= ExclusiveORExpression
+    // | InclusiveORExpression '|' ExclusiveORExpression
+    public boolean parseInclusiveORExpression(ExprTreeHolder tree) throws IOException {
+        if (!parseExclusiveORExpression(tree))
+            return false;
+        while ((lexer.peekToken()) == TokenType.LEX_BITWISE_OR) {
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeR = mutableExprPool.get();
+            lexer.consumeToken();
+            parseExclusiveORExpression(treeR);
+            if (treeL.getInnerTree() != null && treeR.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(Operation.OpKind_BITWISE_OR_OP, treeL, treeR, null, newTree);
+                tree.setInnerTree(newTree);
+            } else {
+                tree.setInnerTree(null);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // ExclusiveORExpression .= ANDExpression
+    // | ExclusiveORExpression '^' ANDExpression
+    private boolean parseExclusiveORExpression(ExprTreeHolder tree) throws IOException {
+        if (!parseANDExpression(tree))
+            return false;
+        while ((lexer.peekToken()) == TokenType.LEX_BITWISE_XOR) {
+            lexer.consumeToken();
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeR = mutableExprPool.get();
+            parseANDExpression(treeR);
+            if (treeL.getInnerTree() != null && treeR.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(Operation.OpKind_BITWISE_XOR_OP, treeL, treeR, null, newTree);
+                tree.setInnerTree(newTree);
+            } else {
+                tree.setInnerTree(null);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // ANDExpression .= EqualityExpression
+    // | ANDExpression '&' EqualityExpression
+    private boolean parseANDExpression(ExprTreeHolder tree) throws IOException {
+        if (!parseEqualityExpression(tree))
+            return false;
+        while ((lexer.peekToken()) == TokenType.LEX_BITWISE_AND) {
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeR = mutableExprPool.get();
+            lexer.consumeToken();
+            parseEqualityExpression(treeR);
+            if (treeL.getInnerTree() != null && treeR.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(Operation.OpKind_BITWISE_AND_OP, treeL, treeR, null, newTree);
+                tree.setInnerTree(newTree);
+            } else {
+                tree.setInnerTree(null);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // EqualityExpression .= RelationalExpression
+    // | EqualityExpression '==' RelationalExpression
+    // | EqualityExpression '!=' RelationalExpression
+    // | EqualityExpression '=?=' RelationalExpression
+    // | EqualityExpression '=!=' RelationalExpression
+    private boolean parseEqualityExpression(ExprTreeHolder tree) throws IOException {
+        TokenType tt;
+        int op = Operation.OpKind_NO_OP;
+        if (!parseRelationalExpression(tree))
+            return false;
+        tt = lexer.peekToken();
+        while (tt == TokenType.LEX_EQUAL || tt == TokenType.LEX_NOT_EQUAL || tt == TokenType.LEX_META_EQUAL
+                || tt == TokenType.LEX_META_NOT_EQUAL) {
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeR = mutableExprPool.get();
+            lexer.consumeToken();
+            parseRelationalExpression(treeR);
+            switch (tt) {
+                case LEX_EQUAL:
+                    op = Operation.OpKind_EQUAL_OP;
+                    break;
+                case LEX_NOT_EQUAL:
+                    op = Operation.OpKind_NOT_EQUAL_OP;
+                    break;
+                case LEX_META_EQUAL:
+                    op = Operation.OpKind_META_EQUAL_OP;
+                    break;
+                case LEX_META_NOT_EQUAL:
+                    op = Operation.OpKind_META_NOT_EQUAL_OP;
+                    break;
+                default:
+                    throw new HyracksDataException("ClassAd:  Should not reach here");
+            }
+            if (treeL.getInnerTree() != null && treeR.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(op, treeL, treeR, null, newTree);
+                tree.setInnerTree(newTree);
+            } else {
+                tree.setInnerTree(null);
+                return false;
+            }
+            tt = lexer.peekToken();
+        }
+        return true;
+    }
+
+    // RelationalExpression .= ShiftExpression
+    // | RelationalExpression '<' ShiftExpression
+    // | RelationalExpression '>' ShiftExpression
+    // | RelationalExpression '<=' ShiftExpression
+    // | RelationalExpression '>=' ShiftExpression
+    private boolean parseRelationalExpression(ExprTreeHolder tree) throws IOException {
+        TokenType tt;
+        if (!parseShiftExpression(tree))
+            return false;
+        tt = lexer.peekToken();
+        while (tt == TokenType.LEX_LESS_THAN || tt == TokenType.LEX_GREATER_THAN || tt == TokenType.LEX_LESS_OR_EQUAL
+                || tt == TokenType.LEX_GREATER_OR_EQUAL) {
+            int op = Operation.OpKind_NO_OP;
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeR = mutableExprPool.get();
+            lexer.consumeToken();
+            parseShiftExpression(treeR);
+            switch (tt) {
+                case LEX_LESS_THAN:
+                    op = Operation.OpKind_LESS_THAN_OP;
+                    break;
+                case LEX_LESS_OR_EQUAL:
+                    op = Operation.OpKind_LESS_OR_EQUAL_OP;
+                    break;
+                case LEX_GREATER_THAN:
+                    op = Operation.OpKind_GREATER_THAN_OP;
+                    break;
+                case LEX_GREATER_OR_EQUAL:
+                    op = Operation.OpKind_GREATER_OR_EQUAL_OP;
+                    break;
+                default:
+                    throw new HyracksDataException("ClassAd:  Should not reach here");
+            }
+            if (treeL.getInnerTree() != null && treeR.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(op, treeL, treeR, null, newTree);
+                tree.setInnerTree(newTree);
+            } else {
+                tree.setInnerTree(null);
+                return false;
+            }
+            tt = lexer.peekToken();
+        }
+        return true;
+    }
+
+    // ShiftExpression .= AdditiveExpression
+    // | ShiftExpression '<<' AdditiveExpression
+    // | ShiftExpression '>>' AdditiveExpression
+    // | ShiftExpression '>>>' AditiveExpression
+    private boolean parseShiftExpression(ExprTreeHolder tree) throws IOException {
+        if (!parseAdditiveExpression(tree))
+            return false;
+
+        TokenType tt = lexer.peekToken();
+        while (tt == TokenType.LEX_LEFT_SHIFT || tt == TokenType.LEX_RIGHT_SHIFT || tt == TokenType.LEX_URIGHT_SHIFT) {
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeR = mutableExprPool.get();
+            int op;
+            lexer.consumeToken();
+            parseAdditiveExpression(treeR);
+            switch (tt) {
+                case LEX_LEFT_SHIFT:
+                    op = Operation.OpKind_LEFT_SHIFT_OP;
+                    break;
+                case LEX_RIGHT_SHIFT:
+                    op = Operation.OpKind_RIGHT_SHIFT_OP;
+                    break;
+                case LEX_URIGHT_SHIFT:
+                    op = Operation.OpKind_URIGHT_SHIFT_OP;
+                    break;
+                default:
+                    op = Operation.OpKind_NO_OP; // Make gcc's -wuninitalized happy
+                    throw new HyracksDataException("ClassAd:  Should not reach here");
+            }
+
+            if (treeL.getInnerTree() != null && treeR.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(op, treeL, treeR, null, newTree);
+                tree.setInnerTree(newTree);
+            } else {
+                tree.setInnerTree(null);
+                return false;
+            }
+            tt = lexer.peekToken();
+        }
+        return true;
+    }
+
+    // AdditiveExpression .= MultiplicativeExpression
+    // | AdditiveExpression '+' MultiplicativeExpression
+    // | AdditiveExpression '-' MultiplicativeExpression
+    private boolean parseAdditiveExpression(ExprTreeHolder tree) throws IOException {
+        if (!parseMultiplicativeExpression(tree))
+            return false;
+
+        TokenType tt = lexer.peekToken();
+        while (tt == TokenType.LEX_PLUS || tt == TokenType.LEX_MINUS) {
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeR = mutableExprPool.get();
+            lexer.consumeToken();
+            parseMultiplicativeExpression(treeR);
+            if (treeL.getInnerTree() != null && treeR.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(
+                        (tt == TokenType.LEX_PLUS) ? Operation.OpKind_ADDITION_OP : Operation.OpKind_SUBTRACTION_OP,
+                        treeL, treeR, null, newTree);
+                tree.setInnerTree(newTree);
+            } else {
+                tree.setInnerTree(null);
+                return false;
+            }
+            tt = lexer.peekToken();
+        }
+        return true;
+    }
+
+    // MultiplicativeExpression .= UnaryExpression
+    // | MultiplicativeExpression '*' UnaryExpression
+    // | MultiplicativeExpression '/' UnaryExpression
+    // | MultiplicativeExpression '%' UnaryExpression
+    private boolean parseMultiplicativeExpression(ExprTreeHolder tree) throws IOException {
+        if (!parseUnaryExpression(tree))
+            return false;
+
+        TokenType tt = lexer.peekToken();
+        while (tt == TokenType.LEX_MULTIPLY || tt == TokenType.LEX_DIVIDE || tt == TokenType.LEX_MODULUS) {
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeR = mutableExprPool.get();
+            int op;
+            lexer.consumeToken();
+            parseUnaryExpression(treeR);
+            switch (tt) {
+                case LEX_MULTIPLY:
+                    op = Operation.OpKind_MULTIPLICATION_OP;
+                    break;
+                case LEX_DIVIDE:
+                    op = Operation.OpKind_DIVISION_OP;
+                    break;
+                case LEX_MODULUS:
+                    op = Operation.OpKind_MODULUS_OP;
+                    break;
+                default:
+                    op = Operation.OpKind_NO_OP; // Make gcc's -wuninitalized happy
+                    throw new HyracksDataException("ClassAd:  Should not reach here");
+            }
+            if (treeL.getInnerTree() != null && treeR.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(op, treeL, treeR, null, newTree);
+                tree.setInnerTree(newTree);
+            } else {
+                tree.setInnerTree(null);
+                return false;
+            }
+            tt = lexer.peekToken();
+        }
+        return true;
+    }
+
+    // UnaryExpression .= PostfixExpression
+    // | UnaryOperator UnaryExpression
+    // ( where UnaryOperator is one of { -, +, ~, ! } )
+    private boolean parseUnaryExpression(ExprTreeHolder tree) throws IOException {
+        TokenType tt = lexer.peekToken();
+        if (tt == TokenType.LEX_MINUS || tt == TokenType.LEX_PLUS || tt == TokenType.LEX_BITWISE_NOT
+                || tt == TokenType.LEX_LOGICAL_NOT) {
+            lexer.consumeToken();
+            ExprTreeHolder treeM = mutableExprPool.get();
+            int op = Operation.OpKind_NO_OP;
+            parseUnaryExpression(treeM);
+            switch (tt) {
+                case LEX_MINUS:
+                    op = Operation.OpKind_UNARY_MINUS_OP;
+                    break;
+                case LEX_PLUS:
+                    op = Operation.OpKind_UNARY_PLUS_OP;
+                    break;
+                case LEX_BITWISE_NOT:
+                    op = Operation.OpKind_BITWISE_NOT_OP;
+                    break;
+                case LEX_LOGICAL_NOT:
+                    op = Operation.OpKind_LOGICAL_NOT_OP;
+                    break;
+                default:
+                    throw new HyracksDataException("ClassAd: Shouldn't Get here");
+            }
+            if (treeM.getInnerTree() != null) {
+                Operation newTree = operationPool.get();
+                Operation.createOperation(op, treeM, null, null, newTree);
+                tree.setInnerTree(newTree);
+            } else {
+                tree.setInnerTree(null);
+                return (false);
+            }
+            return true;
+        } else {
+            return parsePostfixExpression(tree);
+        }
+    }
+
+    // PostfixExpression .= PrimaryExpression
+    // | PostfixExpression '.' Identifier
+    // | PostfixExpression '[' Expression ']'
+    private boolean parsePostfixExpression(ExprTreeHolder tree) throws IOException {
+        TokenType tt;
+        if (!parsePrimaryExpression(tree))
+            return false;
+        while ((tt = lexer.peekToken()) == TokenType.LEX_OPEN_BOX || tt == TokenType.LEX_SELECTION) {
+            ExprTreeHolder treeL = tree;
+            ExprTreeHolder treeR = mutableExprPool.get();
+            TokenValue tv = tokenValuePool.get();
+            lexer.consumeToken();
+            if (tt == TokenType.LEX_OPEN_BOX) {
+                // subscript operation
+                parseExpression(treeR);
+                if (treeL.getInnerTree() != null && treeR.getInnerTree() != null) {
+                    Operation newTree = operationPool.get();
+                    Operation.createOperation(Operation.OpKind_SUBSCRIPT_OP, treeL, treeR, null, newTree);
+                    if (lexer.consumeToken() == TokenType.LEX_CLOSE_BOX) {
+                        tree.setInnerTree(newTree);
+                        continue;
+                    }
+                }
+                tree.setInnerTree(null);
+                return false;
+            } else if (tt == TokenType.LEX_SELECTION) {
+                // field selection operation
+                if ((tt = lexer.consumeToken(tv)) != TokenType.LEX_IDENTIFIER) {
+                    throw new HyracksDataException("second argument of selector must be an " + "identifier (got"
+                            + String.valueOf(Lexer.strLexToken(tt)) + ")");
+                }
+                AttributeReference newTree = attrRefPool.get();
+                AttributeReference.createAttributeReference(treeL, tv.getStrValue(), false, newTree);
+                tree.setInnerTree(newTree);
+            }
+        }
+        return true;
+    }
+
+    // PrimaryExpression .= Identifier
+    // | FunctionCall
+    // | '.' Identifier
+    // | '(' Expression ')'
+    // | Literal
+    // FunctionCall .= Identifier ArgumentList
+    // ( Constant may be
+    // boolean,undefined,error,string,integer,real,classad,list )
+    // ( ArgumentList non-terminal includes parentheses )
+    private boolean parsePrimaryExpression(ExprTreeHolder tree) throws IOException {
+        ExprTreeHolder treeL;
+        TokenValue tv = tokenValuePool.get();
+        TokenType tt;
+        tree.setInnerTree(null);
+        switch ((tt = lexer.peekToken(tv))) {
+            // identifiers
+            case LEX_IDENTIFIER:
+                isExpr = true;
+                lexer.consumeToken();
+                // check for funcion call
+                if ((tt = lexer.peekToken()) == TokenType.LEX_OPEN_PAREN) {
+                    ExprList argList = exprListPool.get();
+                    if (!parseArgumentList(argList)) {
+                        tree.setInnerTree(null);
+                        return false;
+                    };
+                    // special case function-calls should be converted
+                    // into a literal expression if the argument is a
+                    // string literal
+                    if (shouldEvaluateAtParseTime(tv.getStrValue().toString(), argList)) {
+                        tree.setInnerTree(evaluateFunction(tv.getStrValue().toString(), argList));
+                    } else {
+                        tree.setInnerTree(FunctionCall.createFunctionCall(tv.getStrValue().toString(), argList));
+                    }
+                } else {
+                    // I don't think this is ever hit
+                    tree.setInnerTree(AttributeReference.createAttributeReference(null, tv.getStrValue(), false));
+                }
+                return (tree.getInnerTree() != null);
+            case LEX_SELECTION:
+                isExpr = true;
+                lexer.consumeToken();
+                if ((tt = lexer.consumeToken(tv)) == TokenType.LEX_IDENTIFIER) {
+                    // the boolean final arg signifies that reference is absolute
+                    tree.setInnerTree(AttributeReference.createAttributeReference(null, tv.getStrValue(), true));
+                    return (tree.size() != 0);
+                }
+                // not an identifier following the '.'
+                throw new HyracksDataException(
+                        "need identifier in selection expression (got" + Lexer.strLexToken(tt) + ")");
+                // parenthesized expression
+            case LEX_OPEN_PAREN: {
+                isExpr = true;
+                lexer.consumeToken();
+                treeL = mutableExprPool.get();
+                parseExpression(treeL);
+                if (treeL.getInnerTree() == null) {
+                    tree.resetExprTree(null);
+                    return false;
+                }
+
+                if ((tt = lexer.consumeToken()) != TokenType.LEX_CLOSE_PAREN) {
+                    throw new HyracksDataException("exptected LEX_CLOSE_PAREN, but got " + Lexer.strLexToken(tt));
+                    // tree.resetExprTree(null);
+                    // return false;
+                }
+                // assume make operation will return a new tree
+                tree.setInnerTree(Operation.createOperation(Operation.OpKind_PARENTHESES_OP, treeL));
+                return (tree.size() != 0);
+            }
+                // constants
+            case LEX_OPEN_BOX: {
+                isExpr = true;
+                ClassAd newAd = classAdPool.get();
+                if (!parseClassAd(newAd)) {
+                    tree.resetExprTree(null);
+                    return false;
+                }
+                tree.setInnerTree(newAd);
+            }
+                return true;
+
+            case LEX_OPEN_BRACE: {
+                isExpr = true;
+                ExprList newList = exprListPool.get();
+                if (!parseExprList(newList)) {
+                    tree.setInnerTree(null);
+                    return false;
+                }
+                tree.setInnerTree(newList);
+            }
+                return true;
+
+            case LEX_UNDEFINED_VALUE: {
+                Value val = valuePool.get();
+                lexer.consumeToken();
+                val.setUndefinedValue();
+                tree.setInnerTree(Literal.createLiteral(val));
+                return (tree.getInnerTree() != null);
+            }
+            case LEX_ERROR_VALUE: {
+                Value val = valuePool.get();
+                lexer.consumeToken();
+                val.setErrorValue();
+                tree.setInnerTree(Literal.createLiteral(val));
+                return (tree.getInnerTree() != null);
+            }
+            case LEX_BOOLEAN_VALUE: {
+                Value val = valuePool.get();
+                MutableBoolean b = new MutableBoolean();
+                tv.getBoolValue(b);
+                lexer.consumeToken();
+                val.setBooleanValue(b);
+                tree.setInnerTree(Literal.createLiteral(val));
+                return (tree.getInnerTree() != null);
+            }
+
+            case LEX_INTEGER_VALUE: {
+                Value val = valuePool.get();
+                lexer.consumeToken();
+                val.setIntegerValue(tv.getIntValue());
+                tree.setInnerTree(Literal.createLiteral(val, tv.getFactor()));
+                return (tree.getInnerTree() != null);
+            }
+
+            case LEX_REAL_VALUE: {
+                Value val = valuePool.get();
+                lexer.consumeToken();
+                val.setRealValue(tv.getRealValue());
+                tree.setInnerTree(Literal.createLiteral(val, tv.getFactor()));
+                return (tree.getInnerTree() != null);
+            }
+
+            case LEX_STRING_VALUE: {
+                Value val = valuePool.get();
+                lexer.consumeToken();
+                val.setStringValue(tv.getStrValue());
+                tree.setInnerTree(Literal.createLiteral(val));
+                return (tree.getInnerTree() != null);
+            }
+
+            case LEX_ABSOLUTE_TIME_VALUE: {
+                Value val = valuePool.get();
+                lexer.consumeToken();
+                val.setAbsoluteTimeValue(tv.getTimeValue());
+                tree.setInnerTree(Literal.createLiteral(val));
+                return (tree.getInnerTree() != null);
+            }
+
+            case LEX_RELATIVE_TIME_VALUE: {
+                Value val = valuePool.get();
+                lexer.consumeToken();
+                val.setRelativeTimeValue(tv.getTimeValue().getRelativeTime());
+                tree.setInnerTree(Literal.createLiteral(val));
+                return (tree.getInnerTree() != null);
+            }
+
+            default:
+                tree.setInnerTree(null);
+                return false;
+        }
+    }
+
+    // ArgumentList .= '(' ListOfArguments ')'
+    // ListOfArguments .= (epsilon)
+    // | ListOfArguments ',' Expression
+    public boolean parseArgumentList(ExprList argList) throws IOException {
+        TokenType tt;
+        argList.clear();
+        if ((tt = lexer.consumeToken()) != TokenType.LEX_OPEN_PAREN) {
+            throw new HyracksDataException("expected LEX_OPEN_PAREN but got " + String.valueOf(Lexer.strLexToken(tt)));
+            // return false;
+        }
+        tt = lexer.peekToken();
+        ExprTreeHolder tree = mutableExprPool.get();
+        while (tt != TokenType.LEX_CLOSE_PAREN) {
+            // parse the expression
+            tree.reset();
+            parseExpression(tree);
+            if (tree.getInnerTree() == null) {
+                argList.clear();
+                return false;
+            }
+            // insert the expression into the argument list
+            argList.add(tree.getInnerTree());
+            // the next token must be a ',' or a ')'
+            // or it can be a ';' if using old ClassAd semantics
+            tt = lexer.peekToken();
+            if (tt == TokenType.LEX_COMMA || (tt == TokenType.LEX_SEMICOLON && false))
+                lexer.consumeToken();
+            else if (tt != TokenType.LEX_CLOSE_PAREN) {
+                argList.clear();
+                throw new HyracksDataException(
+                        "expected LEX_COMMA or LEX_CLOSE_PAREN but got " + String.valueOf(Lexer.strLexToken(tt)));
+                // return false;
+            }
+        }
+        lexer.consumeToken();
+        return true;
+    }
+
+    // ClassAd .= '[' AttributeList ']'
+    // AttributeList .= (epsilon)
+    // | Attribute ';' AttributeList
+    // Attribute .= Identifier '=' Expression
+    public boolean parseClassAd(ClassAd ad) throws IOException {
+        return parseClassAd(ad, false);
+    }
+
+    public boolean parseClassAdOld(ClassAd ad, boolean full) throws IOException {
+        return false;
+    }
+
+    public boolean parseClassAd(ClassAd ad, boolean full) throws IOException {
+        TokenType tt;
+        ad.clear();
+        if ((tt = lexer.consumeToken()) != TokenType.LEX_OPEN_BOX)
+            return false;
+        tt = lexer.peekToken();
+        TokenValue tv = tokenValuePool.get();
+        ExprTreeHolder tree = mutableExprPool.get();
+        while (tt != TokenType.LEX_CLOSE_BOX) {
+            // Get the name of the expression
+            tv.reset();
+            tree.reset();
+            tt = lexer.consumeToken(tv);
+            if (tt == TokenType.LEX_SEMICOLON) {
+                // We allow empty expressions, so if someone give a double
+                // semicolon, it doesn't
+                // hurt. Technically it's not right, but we shouldn't make users
+                // pay the price for
+                // a meaningless mistake. See condor-support #1881 for a user
+                // that was bitten by this.
+                continue;
+            }
+            if (tt != TokenType.LEX_IDENTIFIER) {
+                throw new HyracksDataException(
+                        "while parsing classad:  expected LEX_IDENTIFIER " + " but got " + Lexer.strLexToken(tt));
+            }
+
+            // consume the intermediate '='
+            if ((tt = lexer.consumeToken()) != TokenType.LEX_BOUND_TO) {
+                throw new HyracksDataException(
+                        "while parsing classad:  expected LEX_BOUND_TO " + " but got " + Lexer.strLexToken(tt));
+            }
+
+            isExpr = false;
+            // parse the expression
+            parseExpression(tree);
+            if (tree.getInnerTree() == null) {
+                throw new HyracksDataException("parse expression returned empty tree");
+            }
+
+            // insert the attribute into the classad
+            if (!ad.insert(tv.getStrValue().toString(), tree)) {
+                throw new HyracksDataException("Couldn't insert value to classad");
+            }
+
+            // the next token must be a ';' or a ']'
+            tt = lexer.peekToken();
+            if (tt != TokenType.LEX_SEMICOLON && tt != TokenType.LEX_CLOSE_BOX) {
+                throw new HyracksDataException("while parsing classad:  expected LEX_SEMICOLON or "
+                        + "LEX_CLOSE_BOX but got " + Lexer.strLexToken(tt));
+            }
+
+            // Slurp up any extra semicolons. This does not duplicate the work
+            // at the top of the loop
+            // because it accounts for the case where the last expression has
+            // extra semicolons,
+            // while the first case accounts for optional beginning semicolons.
+            while (tt == TokenType.LEX_SEMICOLON) {
+                lexer.consumeToken();
+                tt = lexer.peekToken();
+            }
+        }
+        lexer.consumeToken();
+        // if a full parse was requested, ensure that input is exhausted
+        if (full && (lexer.consumeToken() != TokenType.LEX_END_OF_INPUT)) {
+            throw new HyracksDataException("while parsing classad:  expected LEX_END_OF_INPUT for "
+                    + "full parse but got " + Lexer.strLexToken(tt));
+        }
+        return true;
+    }
+
+    // ExprList .= '{' ListOfExpressions '}'
+    // ListOfExpressions .= (epsilon)
+    // | Expression ',' ListOfExpressions
+    public boolean parseExprList(ExprList list) throws IOException {
+        return parseExprList(list, false);
+    }
+
+    public boolean parseExprList(ExprList list, boolean full) throws IOException {
+        TokenType tt;
+        ExprTreeHolder tree = new ExprTreeHolder();
+        ExprList loe = new ExprList();
+
+        if ((tt = lexer.consumeToken()) != TokenType.LEX_OPEN_BRACE) {
+            throw new HyracksDataException(
+                    "while parsing expression list:  expected LEX_OPEN_BRACE" + " but got " + Lexer.strLexToken(tt));
+            // return false;
+        }
+        tt = lexer.peekToken();
+        while (tt != TokenType.LEX_CLOSE_BRACE) {
+            // parse the expression
+            parseExpression(tree);
+            if (tree.getInnerTree() == null) {
+                throw new HyracksDataException("while parsing expression list:  expected "
+                        + "LEX_CLOSE_BRACE or LEX_COMMA but got " + Lexer.strLexToken(tt));
+            }
+
+            // insert the expression into the list
+            loe.add(tree);
+
+            // the next token must be a ',' or a '}'
+            tt = lexer.peekToken();
+            if (tt == TokenType.LEX_COMMA)
+                lexer.consumeToken();
+            else if (tt != TokenType.LEX_CLOSE_BRACE) {
+                throw new HyracksDataException("while parsing expression list:  expected "
+                        + "LEX_CLOSE_BRACE or LEX_COMMA but got " + Lexer.strLexToken(tt));
+            }
+        }
+
+        lexer.consumeToken();
+        list.setValue(ExprList.createExprList(loe));
+
+        // if a full parse was requested, ensure that input is exhausted
+        if (full && (lexer.consumeToken() != TokenType.LEX_END_OF_INPUT)) {
+            list.clear();
+            throw new HyracksDataException("while parsing expression list:  expected "
+                    + "LEX_END_OF_INPUT for full parse but got " + Lexer.strLexToken(tt));
+        }
+        return true;
+    }
+
+    public boolean shouldEvaluateAtParseTime(String functionName, ExprList argList) throws HyracksDataException {
+        boolean should_eval = false;
+        if (functionName.equalsIgnoreCase("absTime") || functionName.equalsIgnoreCase("relTime")) {
+            if (argList.size() == 1 && argList.get(0).getKind() == NodeKind.LITERAL_NODE) {
+                Value val = new Value();
+                AMutableNumberFactor factor = new AMutableNumberFactor();
+                ((Literal) argList.get(0)).getComponents(val, factor);
+                if (val.isStringValue()) {
+                    should_eval = true;
+                }
+            }
+        }
+        return should_eval;
+    }
+
+    public ExprTree evaluateFunction(String functionName, ExprList argList) throws HyracksDataException {
+        Value val = new Value();
+        AMutableNumberFactor factor = new AMutableNumberFactor();
+        ExprTreeHolder tree = new ExprTreeHolder();
+        ((Literal) argList.get(0)).getComponents(val, factor);
+
+        AMutableCharArrayString string_value = new AMutableCharArrayString();
+        if (val.isStringValue(string_value)) {
+            if (functionName.equalsIgnoreCase("absTime")) {
+                tree.setInnerTree(Literal.createAbsTime(string_value));
+            } else if (functionName.equalsIgnoreCase("relTime")) {
+                tree.setInnerTree(Literal.createRelTime(string_value));
+            } else {
+                tree.setInnerTree(FunctionCall.createFunctionCall(functionName, argList));
+            }
+        } else {
+            tree.setInnerTree(FunctionCall.createFunctionCall(functionName, argList));
+        }
+        return tree;
+    }
+
+    public TokenType peekToken() throws IOException {
+        if (lexer.wasInitialized()) {
+            return lexer.peekToken();
+        } else {
+            return TokenType.LEX_TOKEN_ERROR;
+        }
+    }
+
+    public TokenType consumeToken() throws IOException {
+        if (lexer.wasInitialized()) {
+            return lexer.consumeToken();
+        } else {
+            return TokenType.LEX_TOKEN_ERROR;
+        }
+    }
+
+    public boolean parseExpression(String buf, ExprTreeHolder tree) throws IOException {
+        return parseExpression(buf, tree, false);
+    }
+
+    public boolean parseExpression(String buf, ExprTreeHolder tree, boolean full) throws IOException {
+        boolean success;
+        StringLexerSource lexer_source = new StringLexerSource(buf);
+
+        success = false;
+        if (lexer.initialize(lexer_source)) {
+            success = parseExpression(tree, full);
+        }
+        return success;
+    }
+
+    public ClassAd parseClassAd(String input_basic) throws IOException {
+        return parseClassAd(input_basic, false);
+    }
+
+    public LexerSource getLexerSource() {
+        return currentSource;
+    }
+
+    public void setLexerSource(LexerSource lexerSource) {
+        this.currentSource = lexerSource;
+    }
+
+    public void reset() {
+        resetPools();
+    }
+
+    public Literal getLiteral() {
+        return literalPool.get();
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return ExternalDataUtils.isDataSourceStreamProvider(configuration) ? DataSourceType.STREAM
+                : DataSourceType.RECORDS;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType recordType) throws IOException {
+        this.recordType = recordType;
+        this.configuration = configuration;
+        String parserConfig = configuration.get(ClassAdParserFactory.KEY_OLD_FORMAT);
+        if (parserConfig != null && parserConfig.equalsIgnoreCase(ExternalDataConstants.TRUE)) {
+            oldFormat = true;
+            rootAd.createParser();
+        }
+        parserConfig = configuration.get(ExternalDataConstants.KEY_READER);
+        if (parserConfig != null && parserConfig.equalsIgnoreCase(ExternalDataConstants.READER_LINE_SEPARATED)) {
+            oldFormat = true;
+            rootAd.createParser();
+        }
+
+        parserConfig = configuration.get(ClassAdParserFactory.KEY_EVALUATE);
+        if (parserConfig != null && parserConfig.equalsIgnoreCase("false")) {
+            evaluateExpr = false;
+            keepBoth = false;
+        }
+        parserConfig = configuration.get(ClassAdParserFactory.KEY_KEEP_EXPR);
+        if (parserConfig != null && parserConfig.equalsIgnoreCase("false")) {
+            keepBoth = false;
+            evaluateExpr = true;
+        }
+        parserConfig = configuration.get(ClassAdParserFactory.KEY_EXPR_PREFIX);
+        if (parserConfig != null && parserConfig.trim().length() > 0) {
+            exprPrefix = parserConfig;
+        }
+        parserConfig = configuration.get(ClassAdParserFactory.KEY_EXPR_SUFFIX);
+        if (parserConfig != null && parserConfig.trim().length() > 0) {
+            exprSuffix = parserConfig;
+        }
+        parserConfig = configuration.get(ClassAdParserFactory.KEY_EXPR_NAME_SUFFIX);
+        if (parserConfig != null && parserConfig.trim().length() > 0) {
+            exprFieldNameSuffix = parserConfig;
+        }
+        if (!oldFormat) {
+            configuration.put(ExternalDataConstants.KEY_RECORD_START, "[");
+            configuration.put(ExternalDataConstants.KEY_RECORD_END, "]");
+        }
+    }
+
+    @Override
+    public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws IOException {
+        try {
+            if (oldFormat) {
+                int maxOffset = record.size();
+                rootAd.clear();
+                char[] buffer = record.get();
+                aInt32.setValue(0);
+                String line = readLine(buffer, aInt32, maxOffset);
+                while (line != null) {
+                    if (line.trim().length() == 0) {
+                        if (rootAd.size() == 0) {
+                            line = readLine(buffer, aInt32, maxOffset);
+                            continue;
+                        }
+                        break;
+                    } else if (!rootAd.insert(line)) {
+                        throw new HyracksDataException("Couldn't parse expression in line: " + line);
+                    }
+                    line = readLine(buffer, aInt32, maxOffset);
+                }
+            } else {
+                resetPools();
+                currentSource.setNewSource(record.get());
+                rootAd.reset();
+                asterixParseClassAd(rootAd);
+            }
+            parseRecord(recordType, rootAd, out);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public Class<? extends char[]> getRecordClass() {
+        return char[].class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParserFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParserFactory.java
new file mode 100644
index 0000000..97982df
--- /dev/null
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParserFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ClassAdParserFactory implements IRecordDataParserFactory<char[]> {
+
+    private static final long serialVersionUID = 1L;
+    public static final String KEY_OLD_FORMAT = "old-format";
+    public static final String KEY_EVALUATE = "evaluate";
+    public static final String KEY_KEEP_EXPR = "keep-expr";
+    public static final String KEY_EXPR_PREFIX = "expr-prefix";
+    public static final String KEY_EXPR_SUFFIX = "expr-suffix";
+    public static final String KEY_EXPR_NAME_SUFFIX = "expr-name-suffix";
+
+    private ARecordType recordType;
+    private Map<String, String> configuration;
+    private boolean oldFormat = false;
+
+    private void writeObject(java.io.ObjectOutputStream stream) throws IOException {
+        stream.writeObject(recordType);
+        stream.writeObject(configuration);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void readObject(java.io.ObjectInputStream stream) throws IOException, ClassNotFoundException {
+        recordType = (ARecordType) stream.readObject();
+        configuration = (Map<String, String>) stream.readObject();
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() throws AsterixException {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        // is old format?
+        String parserConfig = configuration.get(KEY_OLD_FORMAT);
+        if (parserConfig != null && parserConfig.equalsIgnoreCase(ExternalDataConstants.TRUE)) {
+            oldFormat = true;
+        }
+        parserConfig = configuration.get(ExternalDataConstants.KEY_READER);
+        if (parserConfig != null && parserConfig.equalsIgnoreCase(ExternalDataConstants.READER_LINE_SEPARATED)) {
+            oldFormat = true;
+        }
+        if (!oldFormat) {
+            configuration.put(ExternalDataConstants.KEY_RECORD_START, "[");
+            configuration.put(ExternalDataConstants.KEY_RECORD_END, "]");
+        }
+
+    }
+
+    @Override
+    public void setRecordType(ARecordType recordType) {
+        this.recordType = recordType;
+    }
+
+    @Override
+    public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx)
+            throws HyracksDataException, AsterixException, IOException {
+        ClassAdParser parser = new ClassAdParser(recordType);
+        parser.configure(configuration, recordType);
+        return parser;
+    }
+
+    @Override
+    public Class<? extends char[]> getRecordClass() {
+        return char[].class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 852b31d..e34a09b 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -21,12 +21,13 @@ package org.apache.asterix.external.library.adapter;
 import java.io.InputStream;
 import java.util.Map;
 
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -59,6 +60,7 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
 
     @Override
     public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+        final String nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
         ITupleParserFactory tupleParserFactory = new ITupleParserFactory() {
             private static final long serialVersionUID = 1L;
 
@@ -69,10 +71,13 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
                 ArrayTupleBuilder tb;
                 try {
                     parser = new ADMDataParser();
-                    forwarder = DataflowUtils.getTupleForwarder(configuration);
+                    forwarder = DataflowUtils.getTupleForwarder(configuration,
+                            FeedUtils.getFeedLogManager(ctx, partition,
+                                    FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
+                                            ExternalDataUtils.getFeedName(configuration), nodeId, partition)));
                     forwarder.configure(configuration);
                     tb = new ArrayTupleBuilder(1);
-                } catch (AsterixException e) {
+                } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
                 return new ITupleParser() {