You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:02 UTC

[04/21] incubator-asterixdb git commit: First stage of external data cleanup

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java
deleted file mode 100644
index 6e4c175..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java
+++ /dev/null
@@ -1,1100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.BitSet;
-import java.util.List;
-
-import org.apache.asterix.builders.AbvsBuilderFactory;
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.builders.IAsterixListBuilder;
-import org.apache.asterix.builders.ListBuilderFactory;
-import org.apache.asterix.builders.OrderedListBuilder;
-import org.apache.asterix.builders.RecordBuilderFactory;
-import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.APolygonSerializerDeserializer;
-import org.apache.asterix.om.base.ABoolean;
-import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.types.AOrderedListType;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.AUnorderedListType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
-import org.apache.asterix.om.types.hierachy.ITypeConvertComputer;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.asterix.om.util.container.IObjectPool;
-import org.apache.asterix.om.util.container.ListObjectPool;
-import org.apache.asterix.runtime.operators.file.adm.AdmLexer;
-import org.apache.asterix.runtime.operators.file.adm.AdmLexerException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IMutableValueStorage;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-
-/**
- * Parser for ADM formatted data.
- */
-public class ADMDataParser extends AbstractDataParser {
-
-    protected AdmLexer admLexer;
-    protected ARecordType recordType;
-    protected boolean datasetRec;
-
-    private int nullableFieldId = 0;
-    private ArrayBackedValueStorage castBuffer = new ArrayBackedValueStorage();
-
-    private IObjectPool<IARecordBuilder, ATypeTag> recordBuilderPool = new ListObjectPool<IARecordBuilder, ATypeTag>(
-            new RecordBuilderFactory());
-    private IObjectPool<IAsterixListBuilder, ATypeTag> listBuilderPool = new ListObjectPool<IAsterixListBuilder, ATypeTag>(
-            new ListBuilderFactory());
-    private IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool = new ListObjectPool<IMutableValueStorage, ATypeTag>(
-            new AbvsBuilderFactory());
-
-    private String mismatchErrorMessage = "Mismatch Type, expecting a value of type ";
-    private String mismatchErrorMessage2 = " got a value of type ";
-
-    static class ParseException extends AsterixException {
-        private static final long serialVersionUID = 1L;
-        private String filename;
-        private int line = -1;
-        private int column = -1;
-
-        public ParseException(String message) {
-            super(message);
-        }
-
-        public ParseException(Throwable cause) {
-            super(cause);
-        }
-
-        public ParseException(String message, Throwable cause) {
-            super(message, cause);
-        }
-
-        public ParseException(Throwable cause, String filename, int line, int column) {
-            super(cause);
-            setLocation(filename, line, column);
-        }
-
-        public void setLocation(String filename, int line, int column) {
-            this.filename = filename;
-            this.line = line;
-            this.column = column;
-        }
-
-        @Override
-        public String getMessage() {
-            StringBuilder msg = new StringBuilder("Parse error");
-            if (filename != null) {
-                msg.append(" in file " + filename);
-            }
-            if (line >= 0) {
-                if (column >= 0) {
-                    msg.append(" at (" + line + ", " + column + ")");
-                } else {
-                    msg.append(" in line " + line);
-                }
-            }
-            return msg.append(": " + super.getMessage()).toString();
-        }
-    }
-
-    public ADMDataParser() {
-        this(null);
-    }
-
-    public ADMDataParser(String filename) {
-        this.filename = filename;
-    }
-
-    @Override
-    public boolean parse(DataOutput out) throws AsterixException {
-        try {
-            resetPools();
-            return parseAdmInstance(recordType, datasetRec, out);
-        } catch (IOException e) {
-            throw new ParseException(e, filename, admLexer.getLine(), admLexer.getColumn());
-        } catch (AdmLexerException e) {
-            throw new AsterixException(e);
-        } catch (ParseException e) {
-            e.setLocation(filename, admLexer.getLine(), admLexer.getColumn());
-            throw e;
-        }
-    }
-
-    @Override
-    public void initialize(InputStream in, ARecordType recordType, boolean datasetRec) throws AsterixException {
-        this.recordType = recordType;
-        this.datasetRec = datasetRec;
-        try {
-            admLexer = new AdmLexer(new java.io.InputStreamReader(in));
-        } catch (IOException e) {
-            throw new ParseException(e);
-        }
-    }
-
-    protected boolean parseAdmInstance(IAType objectType, boolean datasetRec, DataOutput out)
-            throws AsterixException, IOException, AdmLexerException {
-        int token = admLexer.next();
-        if (token == AdmLexer.TOKEN_EOF) {
-            return false;
-        } else {
-            admFromLexerStream(token, objectType, out, datasetRec);
-            return true;
-        }
-    }
-
-    private void admFromLexerStream(int token, IAType objectType, DataOutput out, Boolean datasetRec)
-            throws AsterixException, IOException, AdmLexerException {
-
-        switch (token) {
-            case AdmLexer.TOKEN_NULL_LITERAL: {
-                if (checkType(ATypeTag.NULL, objectType)) {
-                    nullSerde.serialize(ANull.NULL, out);
-                } else {
-                    throw new ParseException("This field can not be null");
-                }
-                break;
-            }
-            case AdmLexer.TOKEN_TRUE_LITERAL: {
-                if (checkType(ATypeTag.BOOLEAN, objectType)) {
-                    booleanSerde.serialize(ABoolean.TRUE, out);
-                } else {
-                    throw new ParseException(mismatchErrorMessage + objectType.getTypeName());
-                }
-                break;
-            }
-            case AdmLexer.TOKEN_BOOLEAN_CONS: {
-                parseConstructor(ATypeTag.BOOLEAN, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_FALSE_LITERAL: {
-                if (checkType(ATypeTag.BOOLEAN, objectType)) {
-                    booleanSerde.serialize(ABoolean.FALSE, out);
-                } else {
-                    throw new ParseException(mismatchErrorMessage + objectType.getTypeName());
-                }
-                break;
-            }
-            case AdmLexer.TOKEN_DOUBLE_LITERAL: {
-                parseToNumericTarget(ATypeTag.DOUBLE, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_DOUBLE_CONS: {
-                parseConstructor(ATypeTag.DOUBLE, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_FLOAT_LITERAL: {
-                parseToNumericTarget(ATypeTag.FLOAT, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_FLOAT_CONS: {
-                parseConstructor(ATypeTag.FLOAT, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_INT8_LITERAL: {
-                parseAndCastNumeric(ATypeTag.INT8, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_INT8_CONS: {
-                parseConstructor(ATypeTag.INT8, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_INT16_LITERAL: {
-                parseAndCastNumeric(ATypeTag.INT16, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_INT16_CONS: {
-                parseConstructor(ATypeTag.INT16, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_INT_LITERAL: {
-                // For an INT value without any suffix, we return it as INT64 type value since it is the default integer type.
-                parseAndCastNumeric(ATypeTag.INT64, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_INT32_LITERAL: {
-                parseAndCastNumeric(ATypeTag.INT32, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_INT32_CONS: {
-                parseConstructor(ATypeTag.INT32, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_INT64_LITERAL: {
-                parseAndCastNumeric(ATypeTag.INT64, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_INT64_CONS: {
-                parseConstructor(ATypeTag.INT64, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_STRING_LITERAL: {
-                if (checkType(ATypeTag.STRING, objectType)) {
-                    final String tokenImage = admLexer.getLastTokenImage().substring(1,
-                            admLexer.getLastTokenImage().length() - 1);
-                    aString.setValue(admLexer.containsEscapes() ? replaceEscapes(tokenImage) : tokenImage);
-                    stringSerde.serialize(aString, out);
-                } else if (checkType(ATypeTag.UUID, objectType)) {
-                    // Dealing with UUID type that is represented by a string
-                    final String tokenImage = admLexer.getLastTokenImage().substring(1,
-                            admLexer.getLastTokenImage().length() - 1);
-                    aUUID.fromStringToAMuatbleUUID(tokenImage);
-                    uuidSerde.serialize(aUUID, out);
-                } else {
-                    throw new ParseException(mismatchErrorMessage + objectType.getTypeName());
-                }
-                break;
-            }
-            case AdmLexer.TOKEN_STRING_CONS: {
-                parseConstructor(ATypeTag.STRING, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_HEX_CONS:
-            case AdmLexer.TOKEN_BASE64_CONS: {
-                if (checkType(ATypeTag.BINARY, objectType)) {
-                    if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_OPEN) {
-                        if (admLexer.next() == AdmLexer.TOKEN_STRING_LITERAL) {
-                            parseToBinaryTarget(token, admLexer.getLastTokenImage(), out);
-                            if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_CLOSE) {
-                                break;
-                            }
-                        }
-                    }
-                }
-                throw new ParseException(mismatchErrorMessage + objectType.getTypeName());
-            }
-            case AdmLexer.TOKEN_DATE_CONS: {
-                parseConstructor(ATypeTag.DATE, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_TIME_CONS: {
-                parseConstructor(ATypeTag.TIME, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_DATETIME_CONS: {
-                parseConstructor(ATypeTag.DATETIME, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_INTERVAL_DATE_CONS: {
-                if (checkType(ATypeTag.INTERVAL, objectType)) {
-                    if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_OPEN) {
-                        if (admLexer.next() == AdmLexer.TOKEN_STRING_LITERAL) {
-                            parseDateInterval(admLexer.getLastTokenImage(), out);
-                            if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_CLOSE) {
-                                break;
-                            }
-                        }
-                    }
-                }
-                throw new ParseException("Wrong interval data parsing for date interval.");
-            }
-            case AdmLexer.TOKEN_INTERVAL_TIME_CONS: {
-                if (checkType(ATypeTag.INTERVAL, objectType)) {
-                    if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_OPEN) {
-                        if (admLexer.next() == AdmLexer.TOKEN_STRING_LITERAL) {
-                            parseTimeInterval(admLexer.getLastTokenImage(), out);
-                            if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_CLOSE) {
-                                break;
-                            }
-                        }
-                    }
-                }
-                throw new ParseException("Wrong interval data parsing for time interval.");
-            }
-            case AdmLexer.TOKEN_INTERVAL_DATETIME_CONS: {
-                if (checkType(ATypeTag.INTERVAL, objectType)) {
-                    if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_OPEN) {
-                        if (admLexer.next() == AdmLexer.TOKEN_STRING_LITERAL) {
-                            parseDateTimeInterval(admLexer.getLastTokenImage(), out);
-                            if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_CLOSE) {
-                                break;
-                            }
-                        }
-                    }
-                }
-                throw new ParseException("Wrong interval data parsing for datetime interval.");
-            }
-            case AdmLexer.TOKEN_DURATION_CONS: {
-                parseConstructor(ATypeTag.DURATION, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_YEAR_MONTH_DURATION_CONS: {
-                parseConstructor(ATypeTag.YEARMONTHDURATION, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_DAY_TIME_DURATION_CONS: {
-                parseConstructor(ATypeTag.DAYTIMEDURATION, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_POINT_CONS: {
-                parseConstructor(ATypeTag.POINT, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_POINT3D_CONS: {
-                parseConstructor(ATypeTag.POINT3D, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_CIRCLE_CONS: {
-                parseConstructor(ATypeTag.CIRCLE, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_RECTANGLE_CONS: {
-                parseConstructor(ATypeTag.RECTANGLE, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_LINE_CONS: {
-                parseConstructor(ATypeTag.LINE, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_POLYGON_CONS: {
-                parseConstructor(ATypeTag.POLYGON, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_START_UNORDERED_LIST: {
-                if (checkType(ATypeTag.UNORDEREDLIST, objectType)) {
-                    objectType = getComplexType(objectType, ATypeTag.UNORDEREDLIST);
-                    parseUnorderedList((AUnorderedListType) objectType, out);
-                } else {
-                    throw new ParseException(mismatchErrorMessage + objectType.getTypeTag());
-                }
-                break;
-            }
-
-            case AdmLexer.TOKEN_START_ORDERED_LIST: {
-                if (checkType(ATypeTag.ORDEREDLIST, objectType)) {
-                    objectType = getComplexType(objectType, ATypeTag.ORDEREDLIST);
-                    parseOrderedList((AOrderedListType) objectType, out);
-                } else {
-                    throw new ParseException(mismatchErrorMessage + objectType.getTypeTag());
-                }
-                break;
-            }
-            case AdmLexer.TOKEN_START_RECORD: {
-                if (checkType(ATypeTag.RECORD, objectType)) {
-                    objectType = getComplexType(objectType, ATypeTag.RECORD);
-                    parseRecord((ARecordType) objectType, out, datasetRec);
-                } else {
-                    throw new ParseException(mismatchErrorMessage + objectType.getTypeTag());
-                }
-                break;
-            }
-            case AdmLexer.TOKEN_UUID_CONS: {
-                parseConstructor(ATypeTag.UUID, objectType, out);
-                break;
-            }
-            case AdmLexer.TOKEN_EOF: {
-                break;
-            }
-            default: {
-                throw new ParseException("Unexpected ADM token kind: " + AdmLexer.tokenKindToString(token) + ".");
-            }
-        }
-
-    }
-
-    private String replaceEscapes(String tokenImage) throws ParseException {
-        char[] chars = tokenImage.toCharArray();
-        int len = chars.length;
-        int readpos = 0;
-        int writepos = 0;
-        int movemarker = 0;
-        while (readpos < len) {
-            if (chars[readpos] == '\\') {
-                moveChars(chars, movemarker, readpos, readpos - writepos);
-                switch (chars[readpos + 1]) {
-                    case '\\':
-                    case '\"':
-                    case '/':
-                        chars[writepos] = chars[readpos + 1];
-                        break;
-                    case 'b':
-                        chars[writepos] = '\b';
-                        break;
-                    case 'f':
-                        chars[writepos] = '\f';
-                        break;
-                    case 'n':
-                        chars[writepos] = '\n';
-                        break;
-                    case 'r':
-                        chars[writepos] = '\r';
-                        break;
-                    case 't':
-                        chars[writepos] = '\t';
-                        break;
-                    case 'u':
-                        chars[writepos] = (char) Integer.parseInt(new String(chars, readpos + 2, 4), 16);
-                        readpos += 4;
-                        break;
-                    default:
-                        throw new ParseException("Illegal escape '\\" + chars[readpos + 1] + "'");
-                }
-                ++readpos;
-                movemarker = readpos + 1;
-            }
-            ++writepos;
-            ++readpos;
-        }
-        moveChars(chars, movemarker, len, readpos - writepos);
-        return new String(chars, 0, len - (readpos - writepos));
-    }
-
-    private static void moveChars(final char[] chars, final int start, final int end, final int offset) {
-        if (offset == 0) {
-            return;
-        }
-        for (int i = start; i < end; ++i) {
-            chars[i - offset] = chars[i];
-        }
-    }
-
-    private IAType getComplexType(IAType aObjectType, ATypeTag tag) {
-        if (aObjectType == null) {
-            return null;
-        }
-
-        if (aObjectType.getTypeTag() == tag) {
-            return aObjectType;
-        }
-
-        if (aObjectType.getTypeTag() == ATypeTag.UNION) {
-            List<IAType> unionList = ((AUnionType) aObjectType).getUnionList();
-            for (int i = 0; i < unionList.size(); i++) {
-                if (unionList.get(i).getTypeTag() == tag) {
-                    return unionList.get(i);
-                }
-            }
-        }
-        return null; // wont get here
-    }
-
-    private ATypeTag getTargetTypeTag(ATypeTag expectedTypeTag, IAType aObjectType) throws IOException {
-        if (aObjectType == null) {
-            return expectedTypeTag;
-        }
-        if (aObjectType.getTypeTag() != ATypeTag.UNION) {
-            final ATypeTag typeTag = aObjectType.getTypeTag();
-            if (ATypeHierarchy.canPromote(expectedTypeTag, typeTag)
-                    || ATypeHierarchy.canDemote(expectedTypeTag, typeTag)) {
-                return typeTag;
-            } else {
-                return null;
-            }
-            //            return ATypeHierarchy.canPromote(expectedTypeTag, typeTag) ? typeTag : null;
-        } else { // union
-            List<IAType> unionList = ((AUnionType) aObjectType).getUnionList();
-            for (IAType t : unionList) {
-                final ATypeTag typeTag = t.getTypeTag();
-                if (ATypeHierarchy.canPromote(expectedTypeTag, typeTag)
-                        || ATypeHierarchy.canDemote(expectedTypeTag, typeTag)) {
-                    return typeTag;
-                }
-            }
-        }
-        return null;
-    }
-
-    private boolean checkType(ATypeTag expectedTypeTag, IAType aObjectType) throws IOException {
-        return getTargetTypeTag(expectedTypeTag, aObjectType) != null;
-    }
-
-    private void parseRecord(ARecordType recType, DataOutput out, Boolean datasetRec)
-            throws IOException, AsterixException, AdmLexerException {
-
-        ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
-        ArrayBackedValueStorage fieldNameBuffer = getTempBuffer();
-        IARecordBuilder recBuilder = getRecordBuilder();
-
-        BitSet nulls = null;
-        if (datasetRec) {
-            if (recType != null) {
-                nulls = new BitSet(recType.getFieldNames().length);
-                recBuilder.reset(recType);
-            } else {
-                recBuilder.reset(null);
-            }
-        } else if (recType != null) {
-            nulls = new BitSet(recType.getFieldNames().length);
-            recBuilder.reset(recType);
-        } else {
-            recBuilder.reset(null);
-        }
-
-        recBuilder.init();
-        int token;
-        boolean inRecord = true;
-        boolean expectingRecordField = false;
-        boolean first = true;
-
-        Boolean openRecordField = false;
-        int fieldId = 0;
-        IAType fieldType = null;
-        do {
-            token = admLexer.next();
-            switch (token) {
-                case AdmLexer.TOKEN_END_RECORD: {
-                    if (expectingRecordField) {
-                        throw new ParseException("Found END_RECORD while expecting a record field.");
-                    }
-                    inRecord = false;
-                    break;
-                }
-                case AdmLexer.TOKEN_STRING_LITERAL: {
-                    // we've read the name of the field
-                    // now read the content
-                    fieldNameBuffer.reset();
-                    fieldValueBuffer.reset();
-                    expectingRecordField = false;
-
-                    if (recType != null) {
-                        String fldName = admLexer.getLastTokenImage().substring(1,
-                                admLexer.getLastTokenImage().length() - 1);
-                        fieldId = recBuilder.getFieldId(fldName);
-                        if (fieldId < 0 && !recType.isOpen()) {
-                            throw new ParseException("This record is closed, you can not add extra fields !!");
-                        } else if (fieldId < 0 && recType.isOpen()) {
-                            aStringFieldName.setValue(admLexer.getLastTokenImage().substring(1,
-                                    admLexer.getLastTokenImage().length() - 1));
-                            stringSerde.serialize(aStringFieldName, fieldNameBuffer.getDataOutput());
-                            openRecordField = true;
-                            fieldType = null;
-                        } else {
-                            // a closed field
-                            nulls.set(fieldId);
-                            fieldType = recType.getFieldTypes()[fieldId];
-                            openRecordField = false;
-                        }
-                    } else {
-                        aStringFieldName.setValue(
-                                admLexer.getLastTokenImage().substring(1, admLexer.getLastTokenImage().length() - 1));
-                        stringSerde.serialize(aStringFieldName, fieldNameBuffer.getDataOutput());
-                        openRecordField = true;
-                        fieldType = null;
-                    }
-
-                    token = admLexer.next();
-                    if (token != AdmLexer.TOKEN_COLON) {
-                        throw new ParseException("Unexpected ADM token kind: " + AdmLexer.tokenKindToString(token)
-                                + " while expecting \":\".");
-                    }
-
-                    token = admLexer.next();
-                    this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput(), false);
-                    if (openRecordField) {
-                        if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
-                            recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
-                        }
-                    } else if (NonTaggedFormatUtil.isOptional(recType)) {
-                        if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
-                            recBuilder.addField(fieldId, fieldValueBuffer);
-                        }
-                    } else {
-                        recBuilder.addField(fieldId, fieldValueBuffer);
-                    }
-
-                    break;
-                }
-                case AdmLexer.TOKEN_COMMA: {
-                    if (first) {
-                        throw new ParseException("Found COMMA before any record field.");
-                    }
-                    if (expectingRecordField) {
-                        throw new ParseException("Found COMMA while expecting a record field.");
-                    }
-                    expectingRecordField = true;
-                    break;
-                }
-                default: {
-                    throw new ParseException("Unexpected ADM token kind: " + AdmLexer.tokenKindToString(token)
-                            + " while parsing record fields.");
-                }
-            }
-            first = false;
-        } while (inRecord);
-
-        if (recType != null) {
-            nullableFieldId = checkNullConstraints(recType, nulls);
-            if (nullableFieldId != -1) {
-                throw new ParseException("Field: " + recType.getFieldNames()[nullableFieldId] + " can not be null");
-            }
-        }
-        recBuilder.write(out, true);
-    }
-
-    private int checkNullConstraints(ARecordType recType, BitSet nulls) {
-        boolean isNull = false;
-        for (int i = 0; i < recType.getFieldTypes().length; i++) {
-            if (nulls.get(i) == false) {
-                IAType type = recType.getFieldTypes()[i];
-                if (type.getTypeTag() != ATypeTag.NULL && type.getTypeTag() != ATypeTag.UNION) {
-                    return i;
-                }
-
-                if (type.getTypeTag() == ATypeTag.UNION) { // union
-                    List<IAType> unionList = ((AUnionType) type).getUnionList();
-                    for (int j = 0; j < unionList.size(); j++) {
-                        if (unionList.get(j).getTypeTag() == ATypeTag.NULL) {
-                            isNull = true;
-                            break;
-                        }
-                    }
-                    if (!isNull) {
-                        return i;
-                    }
-                }
-            }
-        }
-        return -1;
-    }
-
-    private void parseOrderedList(AOrderedListType oltype, DataOutput out)
-            throws IOException, AsterixException, AdmLexerException {
-        ArrayBackedValueStorage itemBuffer = getTempBuffer();
-        OrderedListBuilder orderedListBuilder = (OrderedListBuilder) getOrderedListBuilder();
-
-        IAType itemType = null;
-        if (oltype != null) {
-            itemType = oltype.getItemType();
-        }
-        orderedListBuilder.reset(oltype);
-
-        int token;
-        boolean inList = true;
-        boolean expectingListItem = false;
-        boolean first = true;
-        do {
-            token = admLexer.next();
-            if (token == AdmLexer.TOKEN_END_ORDERED_LIST) {
-                if (expectingListItem) {
-                    throw new ParseException("Found END_COLLECTION while expecting a list item.");
-                }
-                inList = false;
-            } else if (token == AdmLexer.TOKEN_COMMA) {
-                if (first) {
-                    throw new ParseException("Found COMMA before any list item.");
-                }
-                if (expectingListItem) {
-                    throw new ParseException("Found COMMA while expecting a list item.");
-                }
-                expectingListItem = true;
-            } else {
-                expectingListItem = false;
-                itemBuffer.reset();
-
-                admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
-                orderedListBuilder.addItem(itemBuffer);
-            }
-            first = false;
-        } while (inList);
-        orderedListBuilder.write(out, true);
-    }
-
-    private void parseUnorderedList(AUnorderedListType uoltype, DataOutput out)
-            throws IOException, AsterixException, AdmLexerException {
-        ArrayBackedValueStorage itemBuffer = getTempBuffer();
-        UnorderedListBuilder unorderedListBuilder = (UnorderedListBuilder) getUnorderedListBuilder();
-
-        IAType itemType = null;
-
-        if (uoltype != null) {
-            itemType = uoltype.getItemType();
-        }
-        unorderedListBuilder.reset(uoltype);
-
-        int token;
-        boolean inList = true;
-        boolean expectingListItem = false;
-        boolean first = true;
-        do {
-            token = admLexer.next();
-            if (token == AdmLexer.TOKEN_END_RECORD) {
-                if (admLexer.next() == AdmLexer.TOKEN_END_RECORD) {
-                    if (expectingListItem) {
-                        throw new ParseException("Found END_COLLECTION while expecting a list item.");
-                    } else {
-                        inList = false;
-                    }
-                } else {
-                    throw new ParseException("Found END_RECORD while expecting a list item.");
-                }
-            } else if (token == AdmLexer.TOKEN_COMMA) {
-                if (first) {
-                    throw new ParseException("Found COMMA before any list item.");
-                }
-                if (expectingListItem) {
-                    throw new ParseException("Found COMMA while expecting a list item.");
-                }
-                expectingListItem = true;
-            } else {
-                expectingListItem = false;
-                itemBuffer.reset();
-                admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
-                unorderedListBuilder.addItem(itemBuffer);
-            }
-            first = false;
-        } while (inList);
-        unorderedListBuilder.write(out, true);
-    }
-
-    private IARecordBuilder getRecordBuilder() {
-        return recordBuilderPool.allocate(ATypeTag.RECORD);
-    }
-
-    private IAsterixListBuilder getOrderedListBuilder() {
-        return listBuilderPool.allocate(ATypeTag.ORDEREDLIST);
-    }
-
-    private IAsterixListBuilder getUnorderedListBuilder() {
-        return listBuilderPool.allocate(ATypeTag.UNORDEREDLIST);
-    }
-
-    private ArrayBackedValueStorage getTempBuffer() {
-        return (ArrayBackedValueStorage) abvsBuilderPool.allocate(ATypeTag.BINARY);
-    }
-
-    private void parseToBinaryTarget(int lexerToken, String tokenImage, DataOutput out)
-            throws ParseException, HyracksDataException {
-        switch (lexerToken) {
-            case AdmLexer.TOKEN_HEX_CONS: {
-                parseHexBinaryString(tokenImage.toCharArray(), 1, tokenImage.length() - 2, out);
-                break;
-            }
-            case AdmLexer.TOKEN_BASE64_CONS: {
-                parseBase64BinaryString(tokenImage.toCharArray(), 1, tokenImage.length() - 2, out);
-                break;
-            }
-        }
-    }
-
-    private void parseToNumericTarget(ATypeTag typeTag, IAType objectType, DataOutput out)
-            throws AsterixException, IOException {
-        final ATypeTag targetTypeTag = getTargetTypeTag(typeTag, objectType);
-        if (targetTypeTag == null || !parseValue(admLexer.getLastTokenImage(), targetTypeTag, out)) {
-            throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + mismatchErrorMessage2 + typeTag);
-        }
-    }
-
-    private void parseAndCastNumeric(ATypeTag typeTag, IAType objectType, DataOutput out)
-            throws AsterixException, IOException {
-        final ATypeTag targetTypeTag = getTargetTypeTag(typeTag, objectType);
-        DataOutput dataOutput = out;
-        if (targetTypeTag != typeTag) {
-            castBuffer.reset();
-            dataOutput = castBuffer.getDataOutput();
-        }
-
-        if (targetTypeTag == null || !parseValue(admLexer.getLastTokenImage(), typeTag, dataOutput)) {
-            throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + mismatchErrorMessage2 + typeTag);
-        }
-
-        // If two type tags are not the same, either we try to promote or demote source type to the target type
-        if (targetTypeTag != typeTag) {
-            if (ATypeHierarchy.canPromote(typeTag, targetTypeTag)) {
-                // can promote typeTag to targetTypeTag
-                ITypeConvertComputer promoteComputer = ATypeHierarchy.getTypePromoteComputer(typeTag, targetTypeTag);
-                if (promoteComputer == null) {
-                    throw new AsterixException(
-                            "Can't cast the " + typeTag + " type to the " + targetTypeTag + " type.");
-                }
-                // do the promotion; note that the type tag field should be skipped
-                promoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
-                        castBuffer.getLength() - 1, out);
-            } else if (ATypeHierarchy.canDemote(typeTag, targetTypeTag)) {
-                //can demote source type to the target type
-                ITypeConvertComputer demoteComputer = ATypeHierarchy.getTypeDemoteComputer(typeTag, targetTypeTag);
-                if (demoteComputer == null) {
-                    throw new AsterixException(
-                            "Can't cast the " + typeTag + " type to the " + targetTypeTag + " type.");
-                }
-                // do the demotion; note that the type tag field should be skipped
-                demoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
-                        castBuffer.getLength() - 1, out);
-            }
-        }
-    }
-
-    private void parseConstructor(ATypeTag typeTag, IAType objectType, DataOutput out)
-            throws AsterixException, AdmLexerException, IOException {
-        final ATypeTag targetTypeTag = getTargetTypeTag(typeTag, objectType);
-        if (targetTypeTag != null) {
-            DataOutput dataOutput = out;
-            if (targetTypeTag != typeTag) {
-                castBuffer.reset();
-                dataOutput = castBuffer.getDataOutput();
-            }
-            int token = admLexer.next();
-            if (token == AdmLexer.TOKEN_CONSTRUCTOR_OPEN) {
-                token = admLexer.next();
-                if (token == AdmLexer.TOKEN_STRING_LITERAL) {
-                    final String unquoted = admLexer.getLastTokenImage().substring(1,
-                            admLexer.getLastTokenImage().length() - 1);
-                    if (!parseValue(unquoted, typeTag, dataOutput)) {
-                        throw new ParseException("Missing deserializer method for constructor: "
-                                + AdmLexer.tokenKindToString(token) + ".");
-                    }
-                    token = admLexer.next();
-                    if (token == AdmLexer.TOKEN_CONSTRUCTOR_CLOSE) {
-                        if (targetTypeTag != typeTag) {
-                            ITypeConvertComputer promoteComputer = ATypeHierarchy.getTypePromoteComputer(typeTag,
-                                    targetTypeTag);
-                            // the availability if the promote computer should be consistent with the availability of a target type
-                            assert promoteComputer != null;
-                            // do the promotion; note that the type tag field should be skipped
-                            promoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
-                                    castBuffer.getLength() - 1, out);
-                        }
-                        return;
-                    }
-                }
-            }
-        }
-        throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + ". Got " + typeTag + " instead.");
-    }
-
-    private boolean parseValue(final String unquoted, ATypeTag typeTag, DataOutput out)
-            throws AsterixException, HyracksDataException, IOException {
-        switch (typeTag) {
-            case BOOLEAN:
-                parseBoolean(unquoted, out);
-                return true;
-            case INT8:
-                parseInt8(unquoted, out);
-                return true;
-            case INT16:
-                parseInt16(unquoted, out);
-                return true;
-            case INT32:
-                parseInt32(unquoted, out);
-                return true;
-            case INT64:
-                parseInt64(unquoted, out);
-                return true;
-            case FLOAT:
-                aFloat.setValue(Float.parseFloat(unquoted));
-                floatSerde.serialize(aFloat, out);
-                return true;
-            case DOUBLE:
-                aDouble.setValue(Double.parseDouble(unquoted));
-                doubleSerde.serialize(aDouble, out);
-                return true;
-            case STRING:
-                aString.setValue(unquoted);
-                stringSerde.serialize(aString, out);
-                return true;
-            case TIME:
-                parseTime(unquoted, out);
-                return true;
-            case DATE:
-                parseDate(unquoted, out);
-                return true;
-            case DATETIME:
-                parseDateTime(unquoted, out);
-                return true;
-            case DURATION:
-                parseDuration(unquoted, out);
-                return true;
-            case DAYTIMEDURATION:
-                parseDateTimeDuration(unquoted, out);
-                return true;
-            case YEARMONTHDURATION:
-                parseYearMonthDuration(unquoted, out);
-                return true;
-            case POINT:
-                parsePoint(unquoted, out);
-                return true;
-            case POINT3D:
-                parse3DPoint(unquoted, out);
-                return true;
-            case CIRCLE:
-                parseCircle(unquoted, out);
-                return true;
-            case RECTANGLE:
-                parseRectangle(unquoted, out);
-                return true;
-            case LINE:
-                parseLine(unquoted, out);
-                return true;
-            case POLYGON:
-                APolygonSerializerDeserializer.parse(unquoted, out);
-                return true;
-            case UUID:
-                aUUID.fromStringToAMuatbleUUID(unquoted);
-                uuidSerde.serialize(aUUID, out);
-                return true;
-            default:
-                return false;
-        }
-    }
-
-    private void parseBoolean(String bool, DataOutput out) throws AsterixException, HyracksDataException {
-        String errorMessage = "This can not be an instance of boolean";
-        if (bool.equals("true")) {
-            booleanSerde.serialize(ABoolean.TRUE, out);
-        } else if (bool.equals("false")) {
-            booleanSerde.serialize(ABoolean.FALSE, out);
-        } else {
-            throw new ParseException(errorMessage);
-        }
-    }
-
-    private void parseInt8(String int8, DataOutput out) throws AsterixException, HyracksDataException {
-        String errorMessage = "This can not be an instance of int8";
-        boolean positive = true;
-        byte value = 0;
-        int offset = 0;
-
-        if (int8.charAt(offset) == '+') {
-            offset++;
-        } else if (int8.charAt(offset) == '-') {
-            offset++;
-            positive = false;
-        }
-        for (; offset < int8.length(); offset++) {
-            if (int8.charAt(offset) >= '0' && int8.charAt(offset) <= '9') {
-                value = (byte) (value * 10 + int8.charAt(offset) - '0');
-            } else if (int8.charAt(offset) == 'i' && int8.charAt(offset + 1) == '8' && offset + 2 == int8.length()) {
-                break;
-            } else {
-                throw new ParseException(errorMessage);
-            }
-        }
-        if (value < 0) {
-            throw new ParseException(errorMessage);
-        }
-        if (value > 0 && !positive) {
-            value *= -1;
-        }
-        aInt8.setValue(value);
-        int8Serde.serialize(aInt8, out);
-    }
-
-    private void parseInt16(String int16, DataOutput out) throws AsterixException, HyracksDataException {
-        String errorMessage = "This can not be an instance of int16";
-        boolean positive = true;
-        short value = 0;
-        int offset = 0;
-
-        if (int16.charAt(offset) == '+') {
-            offset++;
-        } else if (int16.charAt(offset) == '-') {
-            offset++;
-            positive = false;
-        }
-        for (; offset < int16.length(); offset++) {
-            if (int16.charAt(offset) >= '0' && int16.charAt(offset) <= '9') {
-                value = (short) (value * 10 + int16.charAt(offset) - '0');
-            } else if (int16.charAt(offset) == 'i' && int16.charAt(offset + 1) == '1' && int16.charAt(offset + 2) == '6'
-                    && offset + 3 == int16.length()) {
-                break;
-            } else {
-                throw new ParseException(errorMessage);
-            }
-        }
-        if (value < 0) {
-            throw new ParseException(errorMessage);
-        }
-        if (value > 0 && !positive) {
-            value *= -1;
-        }
-        aInt16.setValue(value);
-        int16Serde.serialize(aInt16, out);
-    }
-
-    private void parseInt32(String int32, DataOutput out) throws AsterixException, HyracksDataException {
-        String errorMessage = "This can not be an instance of int32";
-        boolean positive = true;
-        int value = 0;
-        int offset = 0;
-
-        if (int32.charAt(offset) == '+') {
-            offset++;
-        } else if (int32.charAt(offset) == '-') {
-            offset++;
-            positive = false;
-        }
-        for (; offset < int32.length(); offset++) {
-            if (int32.charAt(offset) >= '0' && int32.charAt(offset) <= '9') {
-                value = (value * 10 + int32.charAt(offset) - '0');
-            } else if (int32.charAt(offset) == 'i' && int32.charAt(offset + 1) == '3' && int32.charAt(offset + 2) == '2'
-                    && offset + 3 == int32.length()) {
-                break;
-            } else {
-                throw new ParseException(errorMessage);
-            }
-        }
-        if (value < 0) {
-            throw new ParseException(errorMessage);
-        }
-        if (value > 0 && !positive) {
-            value *= -1;
-        }
-
-        aInt32.setValue(value);
-        int32Serde.serialize(aInt32, out);
-    }
-
-    private void parseInt64(String int64, DataOutput out) throws AsterixException, HyracksDataException {
-        String errorMessage = "This can not be an instance of int64";
-        boolean positive = true;
-        long value = 0;
-        int offset = 0;
-
-        if (int64.charAt(offset) == '+') {
-            offset++;
-        } else if (int64.charAt(offset) == '-') {
-            offset++;
-            positive = false;
-        }
-        for (; offset < int64.length(); offset++) {
-            if (int64.charAt(offset) >= '0' && int64.charAt(offset) <= '9') {
-                value = (value * 10 + int64.charAt(offset) - '0');
-            } else if (int64.charAt(offset) == 'i' && int64.charAt(offset + 1) == '6' && int64.charAt(offset + 2) == '4'
-                    && offset + 3 == int64.length()) {
-                break;
-            } else {
-                throw new ParseException(errorMessage);
-            }
-        }
-        if (value < 0) {
-            throw new ParseException(errorMessage);
-        }
-        if (value > 0 && !positive) {
-            value *= -1;
-        }
-
-        aInt64.setValue(value);
-        int64Serde.serialize(aInt64, out);
-    }
-
-    /**
-     * Resets the pools before parsing a top-level record.
-     * In this way the elements in those pools can be re-used.
-     */
-    private void resetPools() {
-        listBuilderPool.reset();
-        recordBuilderPool.reset();
-        abvsBuilderPool.reset();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java
deleted file mode 100644
index 794097f..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java
+++ /dev/null
@@ -1,521 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.io.DataOutput;
-
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.ABinary;
-import org.apache.asterix.om.base.ABoolean;
-import org.apache.asterix.om.base.ACircle;
-import org.apache.asterix.om.base.ADate;
-import org.apache.asterix.om.base.ADateTime;
-import org.apache.asterix.om.base.ADayTimeDuration;
-import org.apache.asterix.om.base.ADouble;
-import org.apache.asterix.om.base.ADuration;
-import org.apache.asterix.om.base.AFloat;
-import org.apache.asterix.om.base.AInt16;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.base.AInt8;
-import org.apache.asterix.om.base.AInterval;
-import org.apache.asterix.om.base.ALine;
-import org.apache.asterix.om.base.AMutableBinary;
-import org.apache.asterix.om.base.AMutableCircle;
-import org.apache.asterix.om.base.AMutableDate;
-import org.apache.asterix.om.base.AMutableDateTime;
-import org.apache.asterix.om.base.AMutableDayTimeDuration;
-import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableDuration;
-import org.apache.asterix.om.base.AMutableFloat;
-import org.apache.asterix.om.base.AMutableInt16;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableInt8;
-import org.apache.asterix.om.base.AMutableInterval;
-import org.apache.asterix.om.base.AMutableLine;
-import org.apache.asterix.om.base.AMutablePoint;
-import org.apache.asterix.om.base.AMutablePoint3D;
-import org.apache.asterix.om.base.AMutableRectangle;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AMutableTime;
-import org.apache.asterix.om.base.AMutableUUID;
-import org.apache.asterix.om.base.AMutableYearMonthDuration;
-import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.base.APoint;
-import org.apache.asterix.om.base.APoint3D;
-import org.apache.asterix.om.base.ARectangle;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.base.ATime;
-import org.apache.asterix.om.base.AUUID;
-import org.apache.asterix.om.base.AYearMonthDuration;
-import org.apache.asterix.om.base.temporal.ADateParserFactory;
-import org.apache.asterix.om.base.temporal.ADurationParserFactory;
-import org.apache.asterix.om.base.temporal.ADurationParserFactory.ADurationParseOption;
-import org.apache.asterix.om.base.temporal.ATimeParserFactory;
-import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.util.bytes.Base64Parser;
-import org.apache.hyracks.util.bytes.HexParser;
-
-/**
- * Base class for data parsers. Includes the common set of definitions for
- * serializers/deserializers for built-in ADM types.
- */
-public abstract class AbstractDataParser implements IDataParser {
-
-    protected AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-    protected AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-    protected AMutableInt32 aInt32 = new AMutableInt32(0);
-    protected AMutableInt64 aInt64 = new AMutableInt64(0);
-    protected AMutableDouble aDouble = new AMutableDouble(0);
-    protected AMutableFloat aFloat = new AMutableFloat(0);
-    protected AMutableString aString = new AMutableString("");
-    protected AMutableBinary aBinary = new AMutableBinary(null, 0, 0);
-    protected AMutableString aStringFieldName = new AMutableString("");
-    protected AMutableUUID aUUID = new AMutableUUID(0, 0);
-    // For temporal and spatial data types
-    protected AMutableTime aTime = new AMutableTime(0);
-    protected AMutableDateTime aDateTime = new AMutableDateTime(0L);
-    protected AMutableDuration aDuration = new AMutableDuration(0, 0);
-    protected AMutableDayTimeDuration aDayTimeDuration = new AMutableDayTimeDuration(0);
-    protected AMutableYearMonthDuration aYearMonthDuration = new AMutableYearMonthDuration(0);
-    protected AMutablePoint aPoint = new AMutablePoint(0, 0);
-    protected AMutablePoint3D aPoint3D = new AMutablePoint3D(0, 0, 0);
-    protected AMutableCircle aCircle = new AMutableCircle(null, 0);
-    protected AMutableRectangle aRectangle = new AMutableRectangle(null, null);
-    protected AMutablePoint aPoint2 = new AMutablePoint(0, 0);
-    protected AMutableLine aLine = new AMutableLine(null, null);
-    protected AMutableDate aDate = new AMutableDate(0);
-    protected final AMutableInterval aInterval = new AMutableInterval(0L, 0L, (byte) 0);
-
-    // Serializers
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ADOUBLE);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ASTRING);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<ABinary> binarySerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ABINARY);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AFloat> floatSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AFLOAT);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AInt8> int8Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT8);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AInt16> int16Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT16);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT32);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT64);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ANULL);
-
-    protected final HexParser hexParser = new HexParser();
-    protected final Base64Parser base64Parser = new Base64Parser();
-
-    // For UUID, we assume that the format is the string representation of UUID
-    // (xxxxxxxx-xxxx-xxxx-xxxxxxxxxxxx) when parsing the data.
-    // Thus, we need to call UUID.fromStringToAMuatbleUUID() to convert it to the internal representation (two long values).
-    @SuppressWarnings("unchecked")
-    protected ISerializerDeserializer<AUUID> uuidSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AUUID);
-
-    // To avoid race conditions, the serdes for temporal and spatial data types needs to be one per parser
-    // ^^^^^^^^^^^^^^^^^^^^^^^^ ??? then why all these serdes are static?
-    @SuppressWarnings("unchecked")
-    protected static final ISerializerDeserializer<ATime> timeSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ATIME);
-    @SuppressWarnings("unchecked")
-    protected static final ISerializerDeserializer<ADate> dateSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ADATE);
-    @SuppressWarnings("unchecked")
-    protected static final ISerializerDeserializer<ADateTime> datetimeSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ADATETIME);
-    @SuppressWarnings("unchecked")
-    protected static final ISerializerDeserializer<ADuration> durationSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ADURATION);
-    @SuppressWarnings("unchecked")
-    protected static final ISerializerDeserializer<ADayTimeDuration> dayTimeDurationSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ADAYTIMEDURATION);
-    @SuppressWarnings("unchecked")
-    protected static final ISerializerDeserializer<AYearMonthDuration> yearMonthDurationSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AYEARMONTHDURATION);
-    @SuppressWarnings("unchecked")
-    protected final static ISerializerDeserializer<APoint> pointSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.APOINT);
-    @SuppressWarnings("unchecked")
-    protected final static ISerializerDeserializer<APoint3D> point3DSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.APOINT3D);
-    @SuppressWarnings("unchecked")
-    protected final static ISerializerDeserializer<ACircle> circleSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ACIRCLE);
-    @SuppressWarnings("unchecked")
-    protected final static ISerializerDeserializer<ARectangle> rectangleSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ARECTANGLE);
-    @SuppressWarnings("unchecked")
-    protected final static ISerializerDeserializer<ALine> lineSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ALINE);
-    @SuppressWarnings("unchecked")
-    private static final ISerializerDeserializer<AInterval> intervalSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINTERVAL);
-
-    protected String filename;
-
-    void setFilename(String filename) {
-        this.filename = filename;
-    }
-
-    protected void parseTime(String time, DataOutput out) throws HyracksDataException {
-        int chrononTimeInMs;
-        try {
-            chrononTimeInMs = ATimeParserFactory.parseTimePart(time, 0, time.length());
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-        aTime.setValue(chrononTimeInMs);
-        timeSerde.serialize(aTime, out);
-    }
-
-    protected void parseDate(String date, DataOutput out) throws HyracksDataException {
-        long chrononTimeInMs = 0;
-        try {
-            chrononTimeInMs = ADateParserFactory.parseDatePart(date, 0, date.length());
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-        short temp = 0;
-        if (chrononTimeInMs < 0 && chrononTimeInMs % GregorianCalendarSystem.CHRONON_OF_DAY != 0) {
-            temp = 1;
-        }
-        aDate.setValue((int) (chrononTimeInMs / GregorianCalendarSystem.CHRONON_OF_DAY) - temp);
-        dateSerde.serialize(aDate, out);
-    }
-
-    protected void parseDateTime(String datetime, DataOutput out) throws HyracksDataException {
-        long chrononTimeInMs = 0;
-        try {
-            // +1 if it is negative (-)
-            short timeOffset = (short) ((datetime.charAt(0) == '-') ? 1 : 0);
-
-            timeOffset += 8;
-
-            if (datetime.charAt(timeOffset) != 'T') {
-                timeOffset += 2;
-                if (datetime.charAt(timeOffset) != 'T') {
-                    throw new AlgebricksException("This can not be an instance of datetime: missing T");
-                }
-            }
-            chrononTimeInMs = ADateParserFactory.parseDatePart(datetime, 0, timeOffset);
-            chrononTimeInMs += ATimeParserFactory.parseTimePart(datetime, timeOffset + 1,
-                    datetime.length() - timeOffset - 1);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-        aDateTime.setValue(chrononTimeInMs);
-        datetimeSerde.serialize(aDateTime, out);
-    }
-
-    protected void parseDuration(String duration, DataOutput out) throws HyracksDataException {
-        try {
-            ADurationParserFactory.parseDuration(duration, 0, duration.length(), aDuration, ADurationParseOption.All);
-            durationSerde.serialize(aDuration, out);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    protected void parseDateTimeDuration(String durationString, DataOutput out) throws HyracksDataException {
-        try {
-            ADurationParserFactory.parseDuration(durationString, 0, durationString.length(), aDayTimeDuration,
-                    ADurationParseOption.All);
-            dayTimeDurationSerde.serialize(aDayTimeDuration, out);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    protected void parseYearMonthDuration(String durationString, DataOutput out) throws HyracksDataException {
-        try {
-            ADurationParserFactory.parseDuration(durationString, 0, durationString.length(), aYearMonthDuration,
-                    ADurationParseOption.All);
-            yearMonthDurationSerde.serialize(aYearMonthDuration, out);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    protected void parsePoint(String point, DataOutput out) throws HyracksDataException {
-        try {
-            aPoint.setValue(Double.parseDouble(point.substring(0, point.indexOf(','))),
-                    Double.parseDouble(point.substring(point.indexOf(',') + 1, point.length())));
-            pointSerde.serialize(aPoint, out);
-        } catch (HyracksDataException e) {
-            throw new HyracksDataException(point + " can not be an instance of point");
-        }
-    }
-
-    protected void parse3DPoint(String point3d, DataOutput out) throws HyracksDataException {
-        try {
-            int firstCommaIndex = point3d.indexOf(',');
-            int secondCommaIndex = point3d.indexOf(',', firstCommaIndex + 1);
-            aPoint3D.setValue(Double.parseDouble(point3d.substring(0, firstCommaIndex)),
-                    Double.parseDouble(point3d.substring(firstCommaIndex + 1, secondCommaIndex)),
-                    Double.parseDouble(point3d.substring(secondCommaIndex + 1, point3d.length())));
-            point3DSerde.serialize(aPoint3D, out);
-        } catch (HyracksDataException e) {
-            throw new HyracksDataException(point3d + " can not be an instance of point3d");
-        }
-    }
-
-    protected void parseCircle(String circle, DataOutput out) throws HyracksDataException {
-        try {
-            String[] parts = circle.split(" ");
-            aPoint.setValue(Double.parseDouble(parts[0].split(",")[0]), Double.parseDouble(parts[0].split(",")[1]));
-            aCircle.setValue(aPoint, Double.parseDouble(parts[1].substring(0, parts[1].length())));
-            circleSerde.serialize(aCircle, out);
-        } catch (HyracksDataException e) {
-            throw new HyracksDataException(circle + " can not be an instance of circle");
-        }
-    }
-
-    protected void parseRectangle(String rectangle, DataOutput out) throws HyracksDataException {
-        try {
-            String[] points = rectangle.split(" ");
-            if (points.length != 2) {
-                throw new HyracksDataException("rectangle consists of only 2 points.");
-            }
-            aPoint.setValue(Double.parseDouble(points[0].split(",")[0]), Double.parseDouble(points[0].split(",")[1]));
-            aPoint2.setValue(Double.parseDouble(points[1].split(",")[0]), Double.parseDouble(points[1].split(",")[1]));
-            if (aPoint.getX() > aPoint2.getX() && aPoint.getY() > aPoint2.getY()) {
-                aRectangle.setValue(aPoint2, aPoint);
-            } else if (aPoint.getX() < aPoint2.getX() && aPoint.getY() < aPoint2.getY()) {
-                aRectangle.setValue(aPoint, aPoint2);
-            } else {
-                throw new IllegalArgumentException(
-                        "Rectangle arugment must be either (bottom left point, top right point) or (top right point, bottom left point)");
-            }
-            rectangleSerde.serialize(aRectangle, out);
-        } catch (HyracksDataException e) {
-            throw new HyracksDataException(rectangle + " can not be an instance of rectangle");
-        }
-    }
-
-    protected void parseLine(String line, DataOutput out) throws HyracksDataException {
-        try {
-            String[] points = line.split(" ");
-            if (points.length != 2) {
-                throw new HyracksDataException("line consists of only 2 points.");
-            }
-            aPoint.setValue(Double.parseDouble(points[0].split(",")[0]), Double.parseDouble(points[0].split(",")[1]));
-            aPoint2.setValue(Double.parseDouble(points[1].split(",")[0]), Double.parseDouble(points[1].split(",")[1]));
-            aLine.setValue(aPoint, aPoint2);
-            lineSerde.serialize(aLine, out);
-        } catch (HyracksDataException e) {
-            throw new HyracksDataException(line + " can not be an instance of line");
-        }
-    }
-
-    protected void parseHexBinaryString(char[] input, int start, int length, DataOutput out)
-            throws HyracksDataException {
-        hexParser.generateByteArrayFromHexString(input, start, length);
-        aBinary.setValue(hexParser.getByteArray(), 0, hexParser.getLength());
-        binarySerde.serialize(aBinary, out);
-    }
-
-    protected void parseBase64BinaryString(char[] input, int start, int length, DataOutput out)
-            throws HyracksDataException {
-        base64Parser.generatePureByteArrayFromBase64String(input, start, length);
-        aBinary.setValue(base64Parser.getByteArray(), 0, base64Parser.getLength());
-        binarySerde.serialize(aBinary, out);
-    }
-
-    protected void parseDateTimeInterval(String interval, DataOutput out) throws HyracksDataException {
-        long chrononTimeInMsStart = 0;
-        long chrononTimeInMsEnd = 0;
-        try {
-            // the starting point for parsing (so for the accessor)
-            int startOffset = 0;
-            int endOffset, timeSeperatorOffsetInDatetimeString;
-
-            // Get the index for the comma
-            int commaIndex = interval.indexOf(',');
-            if (commaIndex < 1) {
-                throw new AlgebricksException("comma is missing for a string of interval");
-            }
-
-            endOffset = commaIndex - 1;
-            timeSeperatorOffsetInDatetimeString = interval.indexOf('T');
-
-            if (timeSeperatorOffsetInDatetimeString < 0) {
-                throw new AlgebricksException(
-                        "This can not be an instance of interval: missing T for a datetime value.");
-            }
-
-            chrononTimeInMsStart = parseDatePart(interval, startOffset, timeSeperatorOffsetInDatetimeString - 1);
-
-            chrononTimeInMsStart += parseTimePart(interval, timeSeperatorOffsetInDatetimeString + 1, endOffset);
-
-            // Interval End
-            startOffset = commaIndex + 1;
-            endOffset = interval.length() - 1;
-
-            timeSeperatorOffsetInDatetimeString = interval.indexOf('T', startOffset);
-
-            if (timeSeperatorOffsetInDatetimeString < 0) {
-                throw new AlgebricksException(
-                        "This can not be an instance of interval: missing T for a datetime value.");
-            }
-
-            chrononTimeInMsEnd = parseDatePart(interval, startOffset, timeSeperatorOffsetInDatetimeString - 1);
-
-            chrononTimeInMsEnd += parseTimePart(interval, timeSeperatorOffsetInDatetimeString + 1, endOffset);
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-
-        try {
-            aInterval.setValue(chrononTimeInMsStart, chrononTimeInMsEnd, ATypeTag.DATETIME.serialize());
-        } catch (AlgebricksException e) {
-            throw new HyracksDataException(e);
-        }
-
-        intervalSerde.serialize(aInterval, out);
-    }
-
-    protected void parseTimeInterval(String interval, DataOutput out) throws HyracksDataException {
-        long chrononTimeInMsStart = 0;
-        long chrononTimeInMsEnd = 0;
-        try {
-            int startOffset = 0;
-            int endOffset;
-
-            // Get the index for the comma
-            int commaIndex = interval.indexOf(',');
-            if (commaIndex < 0) {
-                throw new AlgebricksException("comma is missing for a string of interval");
-            }
-
-            endOffset = commaIndex - 1;
-            // Interval Start
-            chrononTimeInMsStart = parseTimePart(interval, startOffset, endOffset);
-
-            if (chrononTimeInMsStart < 0) {
-                chrononTimeInMsStart += GregorianCalendarSystem.CHRONON_OF_DAY;
-            }
-
-            // Interval End
-            startOffset = commaIndex + 1;
-            endOffset = interval.length() - 1;
-
-            chrononTimeInMsEnd = parseTimePart(interval, startOffset, endOffset);
-            if (chrononTimeInMsEnd < 0) {
-                chrononTimeInMsEnd += GregorianCalendarSystem.CHRONON_OF_DAY;
-            }
-
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-
-        try {
-            aInterval.setValue(chrononTimeInMsStart, chrononTimeInMsEnd, ATypeTag.TIME.serialize());
-        } catch (AlgebricksException e) {
-            throw new HyracksDataException(e);
-        }
-        intervalSerde.serialize(aInterval, out);
-    }
-
-    protected void parseDateInterval(String interval, DataOutput out) throws HyracksDataException {
-        long chrononTimeInMsStart = 0;
-        long chrononTimeInMsEnd = 0;
-        try {
-            // the starting point for parsing (so for the accessor)
-            int startOffset = 0;
-            int endOffset;
-
-            // Get the index for the comma
-            int commaIndex = interval.indexOf(',');
-            if (commaIndex < 1) {
-                throw new AlgebricksException("comma is missing for a string of interval");
-            }
-
-            endOffset = commaIndex - 1;
-            chrononTimeInMsStart = parseDatePart(interval, startOffset, endOffset);
-
-            // Interval End
-            startOffset = commaIndex + 1;
-            endOffset = interval.length() - 1;
-
-            chrononTimeInMsEnd = parseDatePart(interval, startOffset, endOffset);
-
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-
-        try {
-            aInterval.setValue((chrononTimeInMsStart / GregorianCalendarSystem.CHRONON_OF_DAY),
-                    (chrononTimeInMsEnd / GregorianCalendarSystem.CHRONON_OF_DAY), ATypeTag.DATE.serialize());
-        } catch (AlgebricksException e) {
-            throw new HyracksDataException(e);
-        }
-        intervalSerde.serialize(aInterval, out);
-    }
-
-    private long parseDatePart(String interval, int startOffset, int endOffset)
-            throws AlgebricksException, HyracksDataException {
-
-        while (interval.charAt(endOffset) == '"' || interval.charAt(endOffset) == ' ') {
-            endOffset--;
-        }
-
-        while (interval.charAt(startOffset) == '"' || interval.charAt(startOffset) == ' ') {
-            startOffset++;
-        }
-
-        return ADateParserFactory.parseDatePart(interval, startOffset, endOffset - startOffset + 1);
-    }
-
-    private int parseTimePart(String interval, int startOffset, int endOffset)
-            throws AlgebricksException, HyracksDataException {
-
-        while (interval.charAt(endOffset) == '"' || interval.charAt(endOffset) == ' ') {
-            endOffset--;
-        }
-
-        while (interval.charAt(startOffset) == '"' || interval.charAt(startOffset) == ' ') {
-            startOffset++;
-        }
-
-        return ATimeParserFactory.parseTimePart(interval, startOffset, endOffset - startOffset + 1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
deleted file mode 100644
index f3199e9..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-
-/**
- * An abstract class implementation for ITupleParser. It provides common
- * functionality involved in parsing data in an external format and packing
- * frames with formed tuples.
- */
-public abstract class AbstractTupleParser implements ITupleParser {
-
-    protected static Logger LOGGER = Logger.getLogger(AbstractTupleParser.class.getName());
-
-    protected ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-    protected DataOutput dos = tb.getDataOutput();
-    protected final ARecordType recType;
-    protected final IHyracksCommonContext ctx;
-
-    public AbstractTupleParser(IHyracksCommonContext ctx, ARecordType recType) throws HyracksDataException {
-        this.recType = recType;
-        this.ctx = ctx;
-    }
-
-    public abstract IDataParser getDataParser();
-
-    public abstract ITupleForwardPolicy getTupleParserPolicy();
-
-    @Override
-    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
-        IDataParser parser = getDataParser();
-        ITupleForwardPolicy policy = getTupleParserPolicy();
-        try {
-            parser.initialize(in, recType, true);
-            policy.initialize(ctx, writer);
-            while (true) {
-                tb.reset();
-                if (!parser.parse(tb.getDataOutput())) {
-                    break;
-                }
-                tb.addFieldEndOffset();
-                policy.addTuple(tb);
-            }
-            policy.close();
-        } catch (AsterixException ae) {
-            throw new HyracksDataException(ae);
-        } catch (IOException ioe) {
-            throw new HyracksDataException(ioe);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
deleted file mode 100644
index 2053a75..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.common.parse.ITupleForwardPolicy.TupleForwardPolicyType;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public class AsterixTupleParserFactory implements ITupleParserFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    public static enum InputDataFormat {
-        ADM,
-        DELIMITED,
-        UNKNOWN
-    }
-
-    public static final String HAS_HEADER = "has.header";
-    public static final String KEY_FORMAT = "format";
-    public static final String FORMAT_ADM = "adm";
-    public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
-    public static final String FORMAT_BINARY = "binary";
-
-    public static final String KEY_PATH = "path";
-    public static final String KEY_SOURCE_DATATYPE = "type-name";
-    public static final String KEY_DELIMITER = "delimiter";
-    public static final String KEY_PARSER_FACTORY = "parser";
-    public static final String KEY_HEADER = "header";
-    public static final String KEY_QUOTE = "quote";
-    public static final String TIME_TRACKING = "time.tracking";
-    public static final String DEFAULT_QUOTE = "\"";
-    public static final String AT_LEAST_ONE_SEMANTICS = FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS;
-    public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
-    public static final String DEFAULT_DELIMITER = ",";
-
-    private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
-
-    private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
-        Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
-        m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
-        m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
-        m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
-        m.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
-        m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
-        return m;
-    }
-
-    private final ARecordType recordType;
-    private final Map<String, String> configuration;
-    private final InputDataFormat inputDataFormat;
-
-    public AsterixTupleParserFactory(Map<String, String> configuration, ARecordType recType, InputDataFormat dataFormat) {
-        this.recordType = recType;
-        this.configuration = configuration;
-        this.inputDataFormat = dataFormat;
-    }
-
-    @Override
-    public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException {
-        ITupleParser tupleParser = null;
-        try {
-            String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
-            ITupleParserFactory parserFactory = null;
-            if (parserFactoryClassname != null) {
-                parserFactory = (ITupleParserFactory) Class.forName(parserFactoryClassname).newInstance();
-                tupleParser = parserFactory.createTupleParser(ctx);
-            } else {
-                IDataParser dataParser = null;
-                dataParser = createDataParser(ctx);
-                ITupleForwardPolicy policy = getTupleParserPolicy(configuration);
-                policy.configure(configuration);
-                tupleParser = new GenericTupleParser(ctx, recordType, dataParser, policy);
-            }
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-        return tupleParser;
-    }
-
-    private static class GenericTupleParser extends AbstractTupleParser {
-
-        private final IDataParser dataParser;
-
-        private final ITupleForwardPolicy policy;
-
-        public GenericTupleParser(IHyracksCommonContext ctx, ARecordType recType, IDataParser dataParser,
-                ITupleForwardPolicy policy) throws HyracksDataException {
-            super(ctx, recType);
-            this.dataParser = dataParser;
-            this.policy = policy;
-        }
-
-        @Override
-        public IDataParser getDataParser() {
-            return dataParser;
-        }
-
-        @Override
-        public ITupleForwardPolicy getTupleParserPolicy() {
-            return policy;
-        }
-
-    }
-
-    private IDataParser createDataParser(IHyracksCommonContext ctx) throws Exception {
-        IDataParser dataParser = null;
-        switch (inputDataFormat) {
-            case ADM:
-                dataParser = new ADMDataParser();
-                break;
-            case DELIMITED:
-                dataParser = configureDelimitedDataParser(ctx);
-                break;
-            case UNKNOWN:
-                String specifiedFormat = (String) configuration.get(KEY_FORMAT);
-                if (specifiedFormat == null) {
-                    throw new IllegalArgumentException(" Unspecified data format");
-                } else {
-                    if (FORMAT_ADM.equalsIgnoreCase(specifiedFormat.toUpperCase())) {
-                        dataParser = new ADMDataParser();
-                    } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat.toUpperCase())) {
-                        dataParser = configureDelimitedDataParser(ctx);
-                    } else {
-                        throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT)
-                                + " not supported");
-                    }
-                }
-        }
-        return dataParser;
-    }
-
-    public static ITupleForwardPolicy getTupleParserPolicy(Map<String, String> configuration) {
-        ITupleForwardPolicy policy = null;
-        ITupleForwardPolicy.TupleForwardPolicyType policyType = null;
-        String propValue = configuration.get(ITupleForwardPolicy.PARSER_POLICY);
-        if (propValue == null) {
-            policyType = TupleForwardPolicyType.FRAME_FULL;
-        } else {
-            policyType = TupleForwardPolicyType.valueOf(propValue.trim().toUpperCase());
-        }
-        switch (policyType) {
-            case FRAME_FULL:
-                policy = new FrameFullTupleForwardPolicy();
-                break;
-            case COUNTER_TIMER_EXPIRED:
-                policy = new CounterTimerTupleForwardPolicy();
-                break;
-            case RATE_CONTROLLED:
-                policy = new RateControlledTupleForwardPolicy();
-                break;
-        }
-        return policy;
-    }
-
-    private IDataParser configureDelimitedDataParser(IHyracksCommonContext ctx) throws AsterixException {
-        IValueParserFactory[] valueParserFactories = getValueParserFactories();
-        Character delimiter = getDelimiter(configuration);
-        char quote = getQuote(configuration, delimiter);
-        boolean hasHeader = hasHeader();
-        return new DelimitedDataParser(recordType, valueParserFactories, delimiter, quote, hasHeader);
-    }
-  
-
-    private boolean hasHeader() {
-        String value = configuration.get(KEY_HEADER);
-        if (value != null) {
-            return Boolean.valueOf(value);
-        }
-        return false;
-    }
-
-    private IValueParserFactory[] getValueParserFactories() {
-        int n = recordType.getFieldTypes().length;
-        IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
-        for (int i = 0; i < n; i++) {
-            ATypeTag tag = null;
-            if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
-                List<IAType> unionTypes = ((AUnionType) recordType.getFieldTypes()[i]).getUnionList();
-                if (unionTypes.size() != 2 && unionTypes.get(0).getTypeTag() != ATypeTag.NULL) {
-                    throw new NotImplementedException("Non-optional UNION type is not supported.");
-                }
-                tag = unionTypes.get(1).getTypeTag();
-            } else {
-                tag = recordType.getFieldTypes()[i].getTypeTag();
-            }
-            if (tag == null) {
-                throw new NotImplementedException("Failed to get the type information for field " + i + ".");
-            }
-            IValueParserFactory vpf = valueParserFactoryMap.get(tag);
-            if (vpf == null) {
-                throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
-            }
-            fieldParserFactories[i] = vpf;
-        }
-        return fieldParserFactories;
-    }
-
-    // Get a delimiter from the given configuration
-    public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
-        String delimiterValue = configuration.get(AsterixTupleParserFactory.KEY_DELIMITER);
-        if (delimiterValue == null) {
-            delimiterValue = AsterixTupleParserFactory.DEFAULT_DELIMITER;
-        } else if (delimiterValue.length() != 1) {
-            throw new AsterixException("'" + delimiterValue
-                    + "' is not a valid delimiter. The length of a delimiter should be 1.");
-        }
-        return delimiterValue.charAt(0);
-    }
-
-    // Get a quote from the given configuration when the delimiter is given
-    // Need to pass delimiter to check whether they share the same character
-    public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
-        String quoteValue = configuration.get(AsterixTupleParserFactory.KEY_QUOTE);
-        if (quoteValue == null) {
-            quoteValue = AsterixTupleParserFactory.DEFAULT_QUOTE;
-        } else if (quoteValue.length() != 1) {
-            throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
-        }
-
-        // Since delimiter (char type value) can't be null,
-        // we only check whether delimiter and quote use the same character
-        if (quoteValue.charAt(0) == delimiter) {
-            throw new AsterixException("Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter
-                    + "'. ");
-        }
-
-        return quoteValue.charAt(0);
-    }
-
-    // Get the header flag
-    public static boolean getHasHeader(Map<String, String> configuration) {
-        return Boolean.parseBoolean(configuration.get(AsterixTupleParserFactory.KEY_HEADER));
-    }
-
-}