You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:02 UTC
[04/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java
deleted file mode 100644
index 6e4c175..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/ADMDataParser.java
+++ /dev/null
@@ -1,1100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.BitSet;
-import java.util.List;
-
-import org.apache.asterix.builders.AbvsBuilderFactory;
-import org.apache.asterix.builders.IARecordBuilder;
-import org.apache.asterix.builders.IAsterixListBuilder;
-import org.apache.asterix.builders.ListBuilderFactory;
-import org.apache.asterix.builders.OrderedListBuilder;
-import org.apache.asterix.builders.RecordBuilderFactory;
-import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.dataflow.data.nontagged.serde.APolygonSerializerDeserializer;
-import org.apache.asterix.om.base.ABoolean;
-import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.types.AOrderedListType;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.AUnorderedListType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
-import org.apache.asterix.om.types.hierachy.ITypeConvertComputer;
-import org.apache.asterix.om.util.NonTaggedFormatUtil;
-import org.apache.asterix.om.util.container.IObjectPool;
-import org.apache.asterix.om.util.container.ListObjectPool;
-import org.apache.asterix.runtime.operators.file.adm.AdmLexer;
-import org.apache.asterix.runtime.operators.file.adm.AdmLexerException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IMutableValueStorage;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-
-/**
- * Parser for ADM formatted data.
- */
-public class ADMDataParser extends AbstractDataParser {
-
- protected AdmLexer admLexer;
- protected ARecordType recordType;
- protected boolean datasetRec;
-
- private int nullableFieldId = 0;
- private ArrayBackedValueStorage castBuffer = new ArrayBackedValueStorage();
-
- private IObjectPool<IARecordBuilder, ATypeTag> recordBuilderPool = new ListObjectPool<IARecordBuilder, ATypeTag>(
- new RecordBuilderFactory());
- private IObjectPool<IAsterixListBuilder, ATypeTag> listBuilderPool = new ListObjectPool<IAsterixListBuilder, ATypeTag>(
- new ListBuilderFactory());
- private IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool = new ListObjectPool<IMutableValueStorage, ATypeTag>(
- new AbvsBuilderFactory());
-
- private String mismatchErrorMessage = "Mismatch Type, expecting a value of type ";
- private String mismatchErrorMessage2 = " got a value of type ";
-
- static class ParseException extends AsterixException {
- private static final long serialVersionUID = 1L;
- private String filename;
- private int line = -1;
- private int column = -1;
-
- public ParseException(String message) {
- super(message);
- }
-
- public ParseException(Throwable cause) {
- super(cause);
- }
-
- public ParseException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ParseException(Throwable cause, String filename, int line, int column) {
- super(cause);
- setLocation(filename, line, column);
- }
-
- public void setLocation(String filename, int line, int column) {
- this.filename = filename;
- this.line = line;
- this.column = column;
- }
-
- @Override
- public String getMessage() {
- StringBuilder msg = new StringBuilder("Parse error");
- if (filename != null) {
- msg.append(" in file " + filename);
- }
- if (line >= 0) {
- if (column >= 0) {
- msg.append(" at (" + line + ", " + column + ")");
- } else {
- msg.append(" in line " + line);
- }
- }
- return msg.append(": " + super.getMessage()).toString();
- }
- }
-
- public ADMDataParser() {
- this(null);
- }
-
- public ADMDataParser(String filename) {
- this.filename = filename;
- }
-
- @Override
- public boolean parse(DataOutput out) throws AsterixException {
- try {
- resetPools();
- return parseAdmInstance(recordType, datasetRec, out);
- } catch (IOException e) {
- throw new ParseException(e, filename, admLexer.getLine(), admLexer.getColumn());
- } catch (AdmLexerException e) {
- throw new AsterixException(e);
- } catch (ParseException e) {
- e.setLocation(filename, admLexer.getLine(), admLexer.getColumn());
- throw e;
- }
- }
-
- @Override
- public void initialize(InputStream in, ARecordType recordType, boolean datasetRec) throws AsterixException {
- this.recordType = recordType;
- this.datasetRec = datasetRec;
- try {
- admLexer = new AdmLexer(new java.io.InputStreamReader(in));
- } catch (IOException e) {
- throw new ParseException(e);
- }
- }
-
- protected boolean parseAdmInstance(IAType objectType, boolean datasetRec, DataOutput out)
- throws AsterixException, IOException, AdmLexerException {
- int token = admLexer.next();
- if (token == AdmLexer.TOKEN_EOF) {
- return false;
- } else {
- admFromLexerStream(token, objectType, out, datasetRec);
- return true;
- }
- }
-
- private void admFromLexerStream(int token, IAType objectType, DataOutput out, Boolean datasetRec)
- throws AsterixException, IOException, AdmLexerException {
-
- switch (token) {
- case AdmLexer.TOKEN_NULL_LITERAL: {
- if (checkType(ATypeTag.NULL, objectType)) {
- nullSerde.serialize(ANull.NULL, out);
- } else {
- throw new ParseException("This field can not be null");
- }
- break;
- }
- case AdmLexer.TOKEN_TRUE_LITERAL: {
- if (checkType(ATypeTag.BOOLEAN, objectType)) {
- booleanSerde.serialize(ABoolean.TRUE, out);
- } else {
- throw new ParseException(mismatchErrorMessage + objectType.getTypeName());
- }
- break;
- }
- case AdmLexer.TOKEN_BOOLEAN_CONS: {
- parseConstructor(ATypeTag.BOOLEAN, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_FALSE_LITERAL: {
- if (checkType(ATypeTag.BOOLEAN, objectType)) {
- booleanSerde.serialize(ABoolean.FALSE, out);
- } else {
- throw new ParseException(mismatchErrorMessage + objectType.getTypeName());
- }
- break;
- }
- case AdmLexer.TOKEN_DOUBLE_LITERAL: {
- parseToNumericTarget(ATypeTag.DOUBLE, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_DOUBLE_CONS: {
- parseConstructor(ATypeTag.DOUBLE, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_FLOAT_LITERAL: {
- parseToNumericTarget(ATypeTag.FLOAT, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_FLOAT_CONS: {
- parseConstructor(ATypeTag.FLOAT, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_INT8_LITERAL: {
- parseAndCastNumeric(ATypeTag.INT8, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_INT8_CONS: {
- parseConstructor(ATypeTag.INT8, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_INT16_LITERAL: {
- parseAndCastNumeric(ATypeTag.INT16, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_INT16_CONS: {
- parseConstructor(ATypeTag.INT16, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_INT_LITERAL: {
- // For an INT value without any suffix, we return it as INT64 type value since it is the default integer type.
- parseAndCastNumeric(ATypeTag.INT64, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_INT32_LITERAL: {
- parseAndCastNumeric(ATypeTag.INT32, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_INT32_CONS: {
- parseConstructor(ATypeTag.INT32, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_INT64_LITERAL: {
- parseAndCastNumeric(ATypeTag.INT64, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_INT64_CONS: {
- parseConstructor(ATypeTag.INT64, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_STRING_LITERAL: {
- if (checkType(ATypeTag.STRING, objectType)) {
- final String tokenImage = admLexer.getLastTokenImage().substring(1,
- admLexer.getLastTokenImage().length() - 1);
- aString.setValue(admLexer.containsEscapes() ? replaceEscapes(tokenImage) : tokenImage);
- stringSerde.serialize(aString, out);
- } else if (checkType(ATypeTag.UUID, objectType)) {
- // Dealing with UUID type that is represented by a string
- final String tokenImage = admLexer.getLastTokenImage().substring(1,
- admLexer.getLastTokenImage().length() - 1);
- aUUID.fromStringToAMuatbleUUID(tokenImage);
- uuidSerde.serialize(aUUID, out);
- } else {
- throw new ParseException(mismatchErrorMessage + objectType.getTypeName());
- }
- break;
- }
- case AdmLexer.TOKEN_STRING_CONS: {
- parseConstructor(ATypeTag.STRING, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_HEX_CONS:
- case AdmLexer.TOKEN_BASE64_CONS: {
- if (checkType(ATypeTag.BINARY, objectType)) {
- if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_OPEN) {
- if (admLexer.next() == AdmLexer.TOKEN_STRING_LITERAL) {
- parseToBinaryTarget(token, admLexer.getLastTokenImage(), out);
- if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_CLOSE) {
- break;
- }
- }
- }
- }
- throw new ParseException(mismatchErrorMessage + objectType.getTypeName());
- }
- case AdmLexer.TOKEN_DATE_CONS: {
- parseConstructor(ATypeTag.DATE, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_TIME_CONS: {
- parseConstructor(ATypeTag.TIME, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_DATETIME_CONS: {
- parseConstructor(ATypeTag.DATETIME, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_INTERVAL_DATE_CONS: {
- if (checkType(ATypeTag.INTERVAL, objectType)) {
- if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_OPEN) {
- if (admLexer.next() == AdmLexer.TOKEN_STRING_LITERAL) {
- parseDateInterval(admLexer.getLastTokenImage(), out);
- if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_CLOSE) {
- break;
- }
- }
- }
- }
- throw new ParseException("Wrong interval data parsing for date interval.");
- }
- case AdmLexer.TOKEN_INTERVAL_TIME_CONS: {
- if (checkType(ATypeTag.INTERVAL, objectType)) {
- if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_OPEN) {
- if (admLexer.next() == AdmLexer.TOKEN_STRING_LITERAL) {
- parseTimeInterval(admLexer.getLastTokenImage(), out);
- if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_CLOSE) {
- break;
- }
- }
- }
- }
- throw new ParseException("Wrong interval data parsing for time interval.");
- }
- case AdmLexer.TOKEN_INTERVAL_DATETIME_CONS: {
- if (checkType(ATypeTag.INTERVAL, objectType)) {
- if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_OPEN) {
- if (admLexer.next() == AdmLexer.TOKEN_STRING_LITERAL) {
- parseDateTimeInterval(admLexer.getLastTokenImage(), out);
- if (admLexer.next() == AdmLexer.TOKEN_CONSTRUCTOR_CLOSE) {
- break;
- }
- }
- }
- }
- throw new ParseException("Wrong interval data parsing for datetime interval.");
- }
- case AdmLexer.TOKEN_DURATION_CONS: {
- parseConstructor(ATypeTag.DURATION, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_YEAR_MONTH_DURATION_CONS: {
- parseConstructor(ATypeTag.YEARMONTHDURATION, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_DAY_TIME_DURATION_CONS: {
- parseConstructor(ATypeTag.DAYTIMEDURATION, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_POINT_CONS: {
- parseConstructor(ATypeTag.POINT, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_POINT3D_CONS: {
- parseConstructor(ATypeTag.POINT3D, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_CIRCLE_CONS: {
- parseConstructor(ATypeTag.CIRCLE, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_RECTANGLE_CONS: {
- parseConstructor(ATypeTag.RECTANGLE, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_LINE_CONS: {
- parseConstructor(ATypeTag.LINE, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_POLYGON_CONS: {
- parseConstructor(ATypeTag.POLYGON, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_START_UNORDERED_LIST: {
- if (checkType(ATypeTag.UNORDEREDLIST, objectType)) {
- objectType = getComplexType(objectType, ATypeTag.UNORDEREDLIST);
- parseUnorderedList((AUnorderedListType) objectType, out);
- } else {
- throw new ParseException(mismatchErrorMessage + objectType.getTypeTag());
- }
- break;
- }
-
- case AdmLexer.TOKEN_START_ORDERED_LIST: {
- if (checkType(ATypeTag.ORDEREDLIST, objectType)) {
- objectType = getComplexType(objectType, ATypeTag.ORDEREDLIST);
- parseOrderedList((AOrderedListType) objectType, out);
- } else {
- throw new ParseException(mismatchErrorMessage + objectType.getTypeTag());
- }
- break;
- }
- case AdmLexer.TOKEN_START_RECORD: {
- if (checkType(ATypeTag.RECORD, objectType)) {
- objectType = getComplexType(objectType, ATypeTag.RECORD);
- parseRecord((ARecordType) objectType, out, datasetRec);
- } else {
- throw new ParseException(mismatchErrorMessage + objectType.getTypeTag());
- }
- break;
- }
- case AdmLexer.TOKEN_UUID_CONS: {
- parseConstructor(ATypeTag.UUID, objectType, out);
- break;
- }
- case AdmLexer.TOKEN_EOF: {
- break;
- }
- default: {
- throw new ParseException("Unexpected ADM token kind: " + AdmLexer.tokenKindToString(token) + ".");
- }
- }
-
- }
-
- private String replaceEscapes(String tokenImage) throws ParseException {
- char[] chars = tokenImage.toCharArray();
- int len = chars.length;
- int readpos = 0;
- int writepos = 0;
- int movemarker = 0;
- while (readpos < len) {
- if (chars[readpos] == '\\') {
- moveChars(chars, movemarker, readpos, readpos - writepos);
- switch (chars[readpos + 1]) {
- case '\\':
- case '\"':
- case '/':
- chars[writepos] = chars[readpos + 1];
- break;
- case 'b':
- chars[writepos] = '\b';
- break;
- case 'f':
- chars[writepos] = '\f';
- break;
- case 'n':
- chars[writepos] = '\n';
- break;
- case 'r':
- chars[writepos] = '\r';
- break;
- case 't':
- chars[writepos] = '\t';
- break;
- case 'u':
- chars[writepos] = (char) Integer.parseInt(new String(chars, readpos + 2, 4), 16);
- readpos += 4;
- break;
- default:
- throw new ParseException("Illegal escape '\\" + chars[readpos + 1] + "'");
- }
- ++readpos;
- movemarker = readpos + 1;
- }
- ++writepos;
- ++readpos;
- }
- moveChars(chars, movemarker, len, readpos - writepos);
- return new String(chars, 0, len - (readpos - writepos));
- }
-
- private static void moveChars(final char[] chars, final int start, final int end, final int offset) {
- if (offset == 0) {
- return;
- }
- for (int i = start; i < end; ++i) {
- chars[i - offset] = chars[i];
- }
- }
-
- private IAType getComplexType(IAType aObjectType, ATypeTag tag) {
- if (aObjectType == null) {
- return null;
- }
-
- if (aObjectType.getTypeTag() == tag) {
- return aObjectType;
- }
-
- if (aObjectType.getTypeTag() == ATypeTag.UNION) {
- List<IAType> unionList = ((AUnionType) aObjectType).getUnionList();
- for (int i = 0; i < unionList.size(); i++) {
- if (unionList.get(i).getTypeTag() == tag) {
- return unionList.get(i);
- }
- }
- }
- return null; // wont get here
- }
-
- private ATypeTag getTargetTypeTag(ATypeTag expectedTypeTag, IAType aObjectType) throws IOException {
- if (aObjectType == null) {
- return expectedTypeTag;
- }
- if (aObjectType.getTypeTag() != ATypeTag.UNION) {
- final ATypeTag typeTag = aObjectType.getTypeTag();
- if (ATypeHierarchy.canPromote(expectedTypeTag, typeTag)
- || ATypeHierarchy.canDemote(expectedTypeTag, typeTag)) {
- return typeTag;
- } else {
- return null;
- }
- // return ATypeHierarchy.canPromote(expectedTypeTag, typeTag) ? typeTag : null;
- } else { // union
- List<IAType> unionList = ((AUnionType) aObjectType).getUnionList();
- for (IAType t : unionList) {
- final ATypeTag typeTag = t.getTypeTag();
- if (ATypeHierarchy.canPromote(expectedTypeTag, typeTag)
- || ATypeHierarchy.canDemote(expectedTypeTag, typeTag)) {
- return typeTag;
- }
- }
- }
- return null;
- }
-
- private boolean checkType(ATypeTag expectedTypeTag, IAType aObjectType) throws IOException {
- return getTargetTypeTag(expectedTypeTag, aObjectType) != null;
- }
-
- private void parseRecord(ARecordType recType, DataOutput out, Boolean datasetRec)
- throws IOException, AsterixException, AdmLexerException {
-
- ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
- ArrayBackedValueStorage fieldNameBuffer = getTempBuffer();
- IARecordBuilder recBuilder = getRecordBuilder();
-
- BitSet nulls = null;
- if (datasetRec) {
- if (recType != null) {
- nulls = new BitSet(recType.getFieldNames().length);
- recBuilder.reset(recType);
- } else {
- recBuilder.reset(null);
- }
- } else if (recType != null) {
- nulls = new BitSet(recType.getFieldNames().length);
- recBuilder.reset(recType);
- } else {
- recBuilder.reset(null);
- }
-
- recBuilder.init();
- int token;
- boolean inRecord = true;
- boolean expectingRecordField = false;
- boolean first = true;
-
- Boolean openRecordField = false;
- int fieldId = 0;
- IAType fieldType = null;
- do {
- token = admLexer.next();
- switch (token) {
- case AdmLexer.TOKEN_END_RECORD: {
- if (expectingRecordField) {
- throw new ParseException("Found END_RECORD while expecting a record field.");
- }
- inRecord = false;
- break;
- }
- case AdmLexer.TOKEN_STRING_LITERAL: {
- // we've read the name of the field
- // now read the content
- fieldNameBuffer.reset();
- fieldValueBuffer.reset();
- expectingRecordField = false;
-
- if (recType != null) {
- String fldName = admLexer.getLastTokenImage().substring(1,
- admLexer.getLastTokenImage().length() - 1);
- fieldId = recBuilder.getFieldId(fldName);
- if (fieldId < 0 && !recType.isOpen()) {
- throw new ParseException("This record is closed, you can not add extra fields !!");
- } else if (fieldId < 0 && recType.isOpen()) {
- aStringFieldName.setValue(admLexer.getLastTokenImage().substring(1,
- admLexer.getLastTokenImage().length() - 1));
- stringSerde.serialize(aStringFieldName, fieldNameBuffer.getDataOutput());
- openRecordField = true;
- fieldType = null;
- } else {
- // a closed field
- nulls.set(fieldId);
- fieldType = recType.getFieldTypes()[fieldId];
- openRecordField = false;
- }
- } else {
- aStringFieldName.setValue(
- admLexer.getLastTokenImage().substring(1, admLexer.getLastTokenImage().length() - 1));
- stringSerde.serialize(aStringFieldName, fieldNameBuffer.getDataOutput());
- openRecordField = true;
- fieldType = null;
- }
-
- token = admLexer.next();
- if (token != AdmLexer.TOKEN_COLON) {
- throw new ParseException("Unexpected ADM token kind: " + AdmLexer.tokenKindToString(token)
- + " while expecting \":\".");
- }
-
- token = admLexer.next();
- this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput(), false);
- if (openRecordField) {
- if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
- recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
- }
- } else if (NonTaggedFormatUtil.isOptional(recType)) {
- if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
- recBuilder.addField(fieldId, fieldValueBuffer);
- }
- } else {
- recBuilder.addField(fieldId, fieldValueBuffer);
- }
-
- break;
- }
- case AdmLexer.TOKEN_COMMA: {
- if (first) {
- throw new ParseException("Found COMMA before any record field.");
- }
- if (expectingRecordField) {
- throw new ParseException("Found COMMA while expecting a record field.");
- }
- expectingRecordField = true;
- break;
- }
- default: {
- throw new ParseException("Unexpected ADM token kind: " + AdmLexer.tokenKindToString(token)
- + " while parsing record fields.");
- }
- }
- first = false;
- } while (inRecord);
-
- if (recType != null) {
- nullableFieldId = checkNullConstraints(recType, nulls);
- if (nullableFieldId != -1) {
- throw new ParseException("Field: " + recType.getFieldNames()[nullableFieldId] + " can not be null");
- }
- }
- recBuilder.write(out, true);
- }
-
- private int checkNullConstraints(ARecordType recType, BitSet nulls) {
- boolean isNull = false;
- for (int i = 0; i < recType.getFieldTypes().length; i++) {
- if (nulls.get(i) == false) {
- IAType type = recType.getFieldTypes()[i];
- if (type.getTypeTag() != ATypeTag.NULL && type.getTypeTag() != ATypeTag.UNION) {
- return i;
- }
-
- if (type.getTypeTag() == ATypeTag.UNION) { // union
- List<IAType> unionList = ((AUnionType) type).getUnionList();
- for (int j = 0; j < unionList.size(); j++) {
- if (unionList.get(j).getTypeTag() == ATypeTag.NULL) {
- isNull = true;
- break;
- }
- }
- if (!isNull) {
- return i;
- }
- }
- }
- }
- return -1;
- }
-
- private void parseOrderedList(AOrderedListType oltype, DataOutput out)
- throws IOException, AsterixException, AdmLexerException {
- ArrayBackedValueStorage itemBuffer = getTempBuffer();
- OrderedListBuilder orderedListBuilder = (OrderedListBuilder) getOrderedListBuilder();
-
- IAType itemType = null;
- if (oltype != null) {
- itemType = oltype.getItemType();
- }
- orderedListBuilder.reset(oltype);
-
- int token;
- boolean inList = true;
- boolean expectingListItem = false;
- boolean first = true;
- do {
- token = admLexer.next();
- if (token == AdmLexer.TOKEN_END_ORDERED_LIST) {
- if (expectingListItem) {
- throw new ParseException("Found END_COLLECTION while expecting a list item.");
- }
- inList = false;
- } else if (token == AdmLexer.TOKEN_COMMA) {
- if (first) {
- throw new ParseException("Found COMMA before any list item.");
- }
- if (expectingListItem) {
- throw new ParseException("Found COMMA while expecting a list item.");
- }
- expectingListItem = true;
- } else {
- expectingListItem = false;
- itemBuffer.reset();
-
- admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
- orderedListBuilder.addItem(itemBuffer);
- }
- first = false;
- } while (inList);
- orderedListBuilder.write(out, true);
- }
-
- private void parseUnorderedList(AUnorderedListType uoltype, DataOutput out)
- throws IOException, AsterixException, AdmLexerException {
- ArrayBackedValueStorage itemBuffer = getTempBuffer();
- UnorderedListBuilder unorderedListBuilder = (UnorderedListBuilder) getUnorderedListBuilder();
-
- IAType itemType = null;
-
- if (uoltype != null) {
- itemType = uoltype.getItemType();
- }
- unorderedListBuilder.reset(uoltype);
-
- int token;
- boolean inList = true;
- boolean expectingListItem = false;
- boolean first = true;
- do {
- token = admLexer.next();
- if (token == AdmLexer.TOKEN_END_RECORD) {
- if (admLexer.next() == AdmLexer.TOKEN_END_RECORD) {
- if (expectingListItem) {
- throw new ParseException("Found END_COLLECTION while expecting a list item.");
- } else {
- inList = false;
- }
- } else {
- throw new ParseException("Found END_RECORD while expecting a list item.");
- }
- } else if (token == AdmLexer.TOKEN_COMMA) {
- if (first) {
- throw new ParseException("Found COMMA before any list item.");
- }
- if (expectingListItem) {
- throw new ParseException("Found COMMA while expecting a list item.");
- }
- expectingListItem = true;
- } else {
- expectingListItem = false;
- itemBuffer.reset();
- admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
- unorderedListBuilder.addItem(itemBuffer);
- }
- first = false;
- } while (inList);
- unorderedListBuilder.write(out, true);
- }
-
- private IARecordBuilder getRecordBuilder() {
- return recordBuilderPool.allocate(ATypeTag.RECORD);
- }
-
- private IAsterixListBuilder getOrderedListBuilder() {
- return listBuilderPool.allocate(ATypeTag.ORDEREDLIST);
- }
-
- private IAsterixListBuilder getUnorderedListBuilder() {
- return listBuilderPool.allocate(ATypeTag.UNORDEREDLIST);
- }
-
- private ArrayBackedValueStorage getTempBuffer() {
- return (ArrayBackedValueStorage) abvsBuilderPool.allocate(ATypeTag.BINARY);
- }
-
- private void parseToBinaryTarget(int lexerToken, String tokenImage, DataOutput out)
- throws ParseException, HyracksDataException {
- switch (lexerToken) {
- case AdmLexer.TOKEN_HEX_CONS: {
- parseHexBinaryString(tokenImage.toCharArray(), 1, tokenImage.length() - 2, out);
- break;
- }
- case AdmLexer.TOKEN_BASE64_CONS: {
- parseBase64BinaryString(tokenImage.toCharArray(), 1, tokenImage.length() - 2, out);
- break;
- }
- }
- }
-
- private void parseToNumericTarget(ATypeTag typeTag, IAType objectType, DataOutput out)
- throws AsterixException, IOException {
- final ATypeTag targetTypeTag = getTargetTypeTag(typeTag, objectType);
- if (targetTypeTag == null || !parseValue(admLexer.getLastTokenImage(), targetTypeTag, out)) {
- throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + mismatchErrorMessage2 + typeTag);
- }
- }
-
- private void parseAndCastNumeric(ATypeTag typeTag, IAType objectType, DataOutput out)
- throws AsterixException, IOException {
- final ATypeTag targetTypeTag = getTargetTypeTag(typeTag, objectType);
- DataOutput dataOutput = out;
- if (targetTypeTag != typeTag) {
- castBuffer.reset();
- dataOutput = castBuffer.getDataOutput();
- }
-
- if (targetTypeTag == null || !parseValue(admLexer.getLastTokenImage(), typeTag, dataOutput)) {
- throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + mismatchErrorMessage2 + typeTag);
- }
-
- // If two type tags are not the same, either we try to promote or demote source type to the target type
- if (targetTypeTag != typeTag) {
- if (ATypeHierarchy.canPromote(typeTag, targetTypeTag)) {
- // can promote typeTag to targetTypeTag
- ITypeConvertComputer promoteComputer = ATypeHierarchy.getTypePromoteComputer(typeTag, targetTypeTag);
- if (promoteComputer == null) {
- throw new AsterixException(
- "Can't cast the " + typeTag + " type to the " + targetTypeTag + " type.");
- }
- // do the promotion; note that the type tag field should be skipped
- promoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
- castBuffer.getLength() - 1, out);
- } else if (ATypeHierarchy.canDemote(typeTag, targetTypeTag)) {
- //can demote source type to the target type
- ITypeConvertComputer demoteComputer = ATypeHierarchy.getTypeDemoteComputer(typeTag, targetTypeTag);
- if (demoteComputer == null) {
- throw new AsterixException(
- "Can't cast the " + typeTag + " type to the " + targetTypeTag + " type.");
- }
- // do the demotion; note that the type tag field should be skipped
- demoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
- castBuffer.getLength() - 1, out);
- }
- }
- }
-
- private void parseConstructor(ATypeTag typeTag, IAType objectType, DataOutput out)
- throws AsterixException, AdmLexerException, IOException {
- final ATypeTag targetTypeTag = getTargetTypeTag(typeTag, objectType);
- if (targetTypeTag != null) {
- DataOutput dataOutput = out;
- if (targetTypeTag != typeTag) {
- castBuffer.reset();
- dataOutput = castBuffer.getDataOutput();
- }
- int token = admLexer.next();
- if (token == AdmLexer.TOKEN_CONSTRUCTOR_OPEN) {
- token = admLexer.next();
- if (token == AdmLexer.TOKEN_STRING_LITERAL) {
- final String unquoted = admLexer.getLastTokenImage().substring(1,
- admLexer.getLastTokenImage().length() - 1);
- if (!parseValue(unquoted, typeTag, dataOutput)) {
- throw new ParseException("Missing deserializer method for constructor: "
- + AdmLexer.tokenKindToString(token) + ".");
- }
- token = admLexer.next();
- if (token == AdmLexer.TOKEN_CONSTRUCTOR_CLOSE) {
- if (targetTypeTag != typeTag) {
- ITypeConvertComputer promoteComputer = ATypeHierarchy.getTypePromoteComputer(typeTag,
- targetTypeTag);
- // the availability if the promote computer should be consistent with the availability of a target type
- assert promoteComputer != null;
- // do the promotion; note that the type tag field should be skipped
- promoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
- castBuffer.getLength() - 1, out);
- }
- return;
- }
- }
- }
- }
- throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + ". Got " + typeTag + " instead.");
- }
-
- private boolean parseValue(final String unquoted, ATypeTag typeTag, DataOutput out)
- throws AsterixException, HyracksDataException, IOException {
- switch (typeTag) {
- case BOOLEAN:
- parseBoolean(unquoted, out);
- return true;
- case INT8:
- parseInt8(unquoted, out);
- return true;
- case INT16:
- parseInt16(unquoted, out);
- return true;
- case INT32:
- parseInt32(unquoted, out);
- return true;
- case INT64:
- parseInt64(unquoted, out);
- return true;
- case FLOAT:
- aFloat.setValue(Float.parseFloat(unquoted));
- floatSerde.serialize(aFloat, out);
- return true;
- case DOUBLE:
- aDouble.setValue(Double.parseDouble(unquoted));
- doubleSerde.serialize(aDouble, out);
- return true;
- case STRING:
- aString.setValue(unquoted);
- stringSerde.serialize(aString, out);
- return true;
- case TIME:
- parseTime(unquoted, out);
- return true;
- case DATE:
- parseDate(unquoted, out);
- return true;
- case DATETIME:
- parseDateTime(unquoted, out);
- return true;
- case DURATION:
- parseDuration(unquoted, out);
- return true;
- case DAYTIMEDURATION:
- parseDateTimeDuration(unquoted, out);
- return true;
- case YEARMONTHDURATION:
- parseYearMonthDuration(unquoted, out);
- return true;
- case POINT:
- parsePoint(unquoted, out);
- return true;
- case POINT3D:
- parse3DPoint(unquoted, out);
- return true;
- case CIRCLE:
- parseCircle(unquoted, out);
- return true;
- case RECTANGLE:
- parseRectangle(unquoted, out);
- return true;
- case LINE:
- parseLine(unquoted, out);
- return true;
- case POLYGON:
- APolygonSerializerDeserializer.parse(unquoted, out);
- return true;
- case UUID:
- aUUID.fromStringToAMuatbleUUID(unquoted);
- uuidSerde.serialize(aUUID, out);
- return true;
- default:
- return false;
- }
- }
-
- private void parseBoolean(String bool, DataOutput out) throws AsterixException, HyracksDataException {
- String errorMessage = "This can not be an instance of boolean";
- if (bool.equals("true")) {
- booleanSerde.serialize(ABoolean.TRUE, out);
- } else if (bool.equals("false")) {
- booleanSerde.serialize(ABoolean.FALSE, out);
- } else {
- throw new ParseException(errorMessage);
- }
- }
-
- private void parseInt8(String int8, DataOutput out) throws AsterixException, HyracksDataException {
- String errorMessage = "This can not be an instance of int8";
- boolean positive = true;
- byte value = 0;
- int offset = 0;
-
- if (int8.charAt(offset) == '+') {
- offset++;
- } else if (int8.charAt(offset) == '-') {
- offset++;
- positive = false;
- }
- for (; offset < int8.length(); offset++) {
- if (int8.charAt(offset) >= '0' && int8.charAt(offset) <= '9') {
- value = (byte) (value * 10 + int8.charAt(offset) - '0');
- } else if (int8.charAt(offset) == 'i' && int8.charAt(offset + 1) == '8' && offset + 2 == int8.length()) {
- break;
- } else {
- throw new ParseException(errorMessage);
- }
- }
- if (value < 0) {
- throw new ParseException(errorMessage);
- }
- if (value > 0 && !positive) {
- value *= -1;
- }
- aInt8.setValue(value);
- int8Serde.serialize(aInt8, out);
- }
-
- private void parseInt16(String int16, DataOutput out) throws AsterixException, HyracksDataException {
- String errorMessage = "This can not be an instance of int16";
- boolean positive = true;
- short value = 0;
- int offset = 0;
-
- if (int16.charAt(offset) == '+') {
- offset++;
- } else if (int16.charAt(offset) == '-') {
- offset++;
- positive = false;
- }
- for (; offset < int16.length(); offset++) {
- if (int16.charAt(offset) >= '0' && int16.charAt(offset) <= '9') {
- value = (short) (value * 10 + int16.charAt(offset) - '0');
- } else if (int16.charAt(offset) == 'i' && int16.charAt(offset + 1) == '1' && int16.charAt(offset + 2) == '6'
- && offset + 3 == int16.length()) {
- break;
- } else {
- throw new ParseException(errorMessage);
- }
- }
- if (value < 0) {
- throw new ParseException(errorMessage);
- }
- if (value > 0 && !positive) {
- value *= -1;
- }
- aInt16.setValue(value);
- int16Serde.serialize(aInt16, out);
- }
-
- private void parseInt32(String int32, DataOutput out) throws AsterixException, HyracksDataException {
- String errorMessage = "This can not be an instance of int32";
- boolean positive = true;
- int value = 0;
- int offset = 0;
-
- if (int32.charAt(offset) == '+') {
- offset++;
- } else if (int32.charAt(offset) == '-') {
- offset++;
- positive = false;
- }
- for (; offset < int32.length(); offset++) {
- if (int32.charAt(offset) >= '0' && int32.charAt(offset) <= '9') {
- value = (value * 10 + int32.charAt(offset) - '0');
- } else if (int32.charAt(offset) == 'i' && int32.charAt(offset + 1) == '3' && int32.charAt(offset + 2) == '2'
- && offset + 3 == int32.length()) {
- break;
- } else {
- throw new ParseException(errorMessage);
- }
- }
- if (value < 0) {
- throw new ParseException(errorMessage);
- }
- if (value > 0 && !positive) {
- value *= -1;
- }
-
- aInt32.setValue(value);
- int32Serde.serialize(aInt32, out);
- }
-
- private void parseInt64(String int64, DataOutput out) throws AsterixException, HyracksDataException {
- String errorMessage = "This can not be an instance of int64";
- boolean positive = true;
- long value = 0;
- int offset = 0;
-
- if (int64.charAt(offset) == '+') {
- offset++;
- } else if (int64.charAt(offset) == '-') {
- offset++;
- positive = false;
- }
- for (; offset < int64.length(); offset++) {
- if (int64.charAt(offset) >= '0' && int64.charAt(offset) <= '9') {
- value = (value * 10 + int64.charAt(offset) - '0');
- } else if (int64.charAt(offset) == 'i' && int64.charAt(offset + 1) == '6' && int64.charAt(offset + 2) == '4'
- && offset + 3 == int64.length()) {
- break;
- } else {
- throw new ParseException(errorMessage);
- }
- }
- if (value < 0) {
- throw new ParseException(errorMessage);
- }
- if (value > 0 && !positive) {
- value *= -1;
- }
-
- aInt64.setValue(value);
- int64Serde.serialize(aInt64, out);
- }
-
- /**
- * Resets the pools before parsing a top-level record.
- * In this way the elements in those pools can be re-used.
- */
- private void resetPools() {
- listBuilderPool.reset();
- recordBuilderPool.reset();
- abvsBuilderPool.reset();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java
deleted file mode 100644
index 794097f..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractDataParser.java
+++ /dev/null
@@ -1,521 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.io.DataOutput;
-
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.om.base.ABinary;
-import org.apache.asterix.om.base.ABoolean;
-import org.apache.asterix.om.base.ACircle;
-import org.apache.asterix.om.base.ADate;
-import org.apache.asterix.om.base.ADateTime;
-import org.apache.asterix.om.base.ADayTimeDuration;
-import org.apache.asterix.om.base.ADouble;
-import org.apache.asterix.om.base.ADuration;
-import org.apache.asterix.om.base.AFloat;
-import org.apache.asterix.om.base.AInt16;
-import org.apache.asterix.om.base.AInt32;
-import org.apache.asterix.om.base.AInt64;
-import org.apache.asterix.om.base.AInt8;
-import org.apache.asterix.om.base.AInterval;
-import org.apache.asterix.om.base.ALine;
-import org.apache.asterix.om.base.AMutableBinary;
-import org.apache.asterix.om.base.AMutableCircle;
-import org.apache.asterix.om.base.AMutableDate;
-import org.apache.asterix.om.base.AMutableDateTime;
-import org.apache.asterix.om.base.AMutableDayTimeDuration;
-import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableDuration;
-import org.apache.asterix.om.base.AMutableFloat;
-import org.apache.asterix.om.base.AMutableInt16;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableInt8;
-import org.apache.asterix.om.base.AMutableInterval;
-import org.apache.asterix.om.base.AMutableLine;
-import org.apache.asterix.om.base.AMutablePoint;
-import org.apache.asterix.om.base.AMutablePoint3D;
-import org.apache.asterix.om.base.AMutableRectangle;
-import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.AMutableTime;
-import org.apache.asterix.om.base.AMutableUUID;
-import org.apache.asterix.om.base.AMutableYearMonthDuration;
-import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.base.APoint;
-import org.apache.asterix.om.base.APoint3D;
-import org.apache.asterix.om.base.ARectangle;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.base.ATime;
-import org.apache.asterix.om.base.AUUID;
-import org.apache.asterix.om.base.AYearMonthDuration;
-import org.apache.asterix.om.base.temporal.ADateParserFactory;
-import org.apache.asterix.om.base.temporal.ADurationParserFactory;
-import org.apache.asterix.om.base.temporal.ADurationParserFactory.ADurationParseOption;
-import org.apache.asterix.om.base.temporal.ATimeParserFactory;
-import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.util.bytes.Base64Parser;
-import org.apache.hyracks.util.bytes.HexParser;
-
-/**
- * Base class for data parsers. Includes the common set of definitions for
- * serializers/deserializers for built-in ADM types.
- */
-public abstract class AbstractDataParser implements IDataParser {
-
- protected AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
- protected AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- protected AMutableInt32 aInt32 = new AMutableInt32(0);
- protected AMutableInt64 aInt64 = new AMutableInt64(0);
- protected AMutableDouble aDouble = new AMutableDouble(0);
- protected AMutableFloat aFloat = new AMutableFloat(0);
- protected AMutableString aString = new AMutableString("");
- protected AMutableBinary aBinary = new AMutableBinary(null, 0, 0);
- protected AMutableString aStringFieldName = new AMutableString("");
- protected AMutableUUID aUUID = new AMutableUUID(0, 0);
- // For temporal and spatial data types
- protected AMutableTime aTime = new AMutableTime(0);
- protected AMutableDateTime aDateTime = new AMutableDateTime(0L);
- protected AMutableDuration aDuration = new AMutableDuration(0, 0);
- protected AMutableDayTimeDuration aDayTimeDuration = new AMutableDayTimeDuration(0);
- protected AMutableYearMonthDuration aYearMonthDuration = new AMutableYearMonthDuration(0);
- protected AMutablePoint aPoint = new AMutablePoint(0, 0);
- protected AMutablePoint3D aPoint3D = new AMutablePoint3D(0, 0, 0);
- protected AMutableCircle aCircle = new AMutableCircle(null, 0);
- protected AMutableRectangle aRectangle = new AMutableRectangle(null, null);
- protected AMutablePoint aPoint2 = new AMutablePoint(0, 0);
- protected AMutableLine aLine = new AMutableLine(null, null);
- protected AMutableDate aDate = new AMutableDate(0);
- protected final AMutableInterval aInterval = new AMutableInterval(0L, 0L, (byte) 0);
-
- // Serializers
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ASTRING);
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<ABinary> binarySerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ABINARY);
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<AFloat> floatSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<AInt8> int8Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT8);
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<AInt16> int16Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT16);
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ABOOLEAN);
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- protected final HexParser hexParser = new HexParser();
- protected final Base64Parser base64Parser = new Base64Parser();
-
- // For UUID, we assume that the format is the string representation of UUID
- // (xxxxxxxx-xxxx-xxxx-xxxxxxxxxxxx) when parsing the data.
- // Thus, we need to call UUID.fromStringToAMuatbleUUID() to convert it to the internal representation (two long values).
- @SuppressWarnings("unchecked")
- protected ISerializerDeserializer<AUUID> uuidSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AUUID);
-
- // To avoid race conditions, the serdes for temporal and spatial data types needs to be one per parser
- // ^^^^^^^^^^^^^^^^^^^^^^^^ ??? then why all these serdes are static?
- @SuppressWarnings("unchecked")
- protected static final ISerializerDeserializer<ATime> timeSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ATIME);
- @SuppressWarnings("unchecked")
- protected static final ISerializerDeserializer<ADate> dateSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADATE);
- @SuppressWarnings("unchecked")
- protected static final ISerializerDeserializer<ADateTime> datetimeSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADATETIME);
- @SuppressWarnings("unchecked")
- protected static final ISerializerDeserializer<ADuration> durationSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADURATION);
- @SuppressWarnings("unchecked")
- protected static final ISerializerDeserializer<ADayTimeDuration> dayTimeDurationSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADAYTIMEDURATION);
- @SuppressWarnings("unchecked")
- protected static final ISerializerDeserializer<AYearMonthDuration> yearMonthDurationSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AYEARMONTHDURATION);
- @SuppressWarnings("unchecked")
- protected final static ISerializerDeserializer<APoint> pointSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.APOINT);
- @SuppressWarnings("unchecked")
- protected final static ISerializerDeserializer<APoint3D> point3DSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.APOINT3D);
- @SuppressWarnings("unchecked")
- protected final static ISerializerDeserializer<ACircle> circleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ACIRCLE);
- @SuppressWarnings("unchecked")
- protected final static ISerializerDeserializer<ARectangle> rectangleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ARECTANGLE);
- @SuppressWarnings("unchecked")
- protected final static ISerializerDeserializer<ALine> lineSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ALINE);
- @SuppressWarnings("unchecked")
- private static final ISerializerDeserializer<AInterval> intervalSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINTERVAL);
-
- protected String filename;
-
- void setFilename(String filename) {
- this.filename = filename;
- }
-
- protected void parseTime(String time, DataOutput out) throws HyracksDataException {
- int chrononTimeInMs;
- try {
- chrononTimeInMs = ATimeParserFactory.parseTimePart(time, 0, time.length());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- aTime.setValue(chrononTimeInMs);
- timeSerde.serialize(aTime, out);
- }
-
- protected void parseDate(String date, DataOutput out) throws HyracksDataException {
- long chrononTimeInMs = 0;
- try {
- chrononTimeInMs = ADateParserFactory.parseDatePart(date, 0, date.length());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- short temp = 0;
- if (chrononTimeInMs < 0 && chrononTimeInMs % GregorianCalendarSystem.CHRONON_OF_DAY != 0) {
- temp = 1;
- }
- aDate.setValue((int) (chrononTimeInMs / GregorianCalendarSystem.CHRONON_OF_DAY) - temp);
- dateSerde.serialize(aDate, out);
- }
-
- protected void parseDateTime(String datetime, DataOutput out) throws HyracksDataException {
- long chrononTimeInMs = 0;
- try {
- // +1 if it is negative (-)
- short timeOffset = (short) ((datetime.charAt(0) == '-') ? 1 : 0);
-
- timeOffset += 8;
-
- if (datetime.charAt(timeOffset) != 'T') {
- timeOffset += 2;
- if (datetime.charAt(timeOffset) != 'T') {
- throw new AlgebricksException("This can not be an instance of datetime: missing T");
- }
- }
- chrononTimeInMs = ADateParserFactory.parseDatePart(datetime, 0, timeOffset);
- chrononTimeInMs += ATimeParserFactory.parseTimePart(datetime, timeOffset + 1,
- datetime.length() - timeOffset - 1);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- aDateTime.setValue(chrononTimeInMs);
- datetimeSerde.serialize(aDateTime, out);
- }
-
- protected void parseDuration(String duration, DataOutput out) throws HyracksDataException {
- try {
- ADurationParserFactory.parseDuration(duration, 0, duration.length(), aDuration, ADurationParseOption.All);
- durationSerde.serialize(aDuration, out);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- protected void parseDateTimeDuration(String durationString, DataOutput out) throws HyracksDataException {
- try {
- ADurationParserFactory.parseDuration(durationString, 0, durationString.length(), aDayTimeDuration,
- ADurationParseOption.All);
- dayTimeDurationSerde.serialize(aDayTimeDuration, out);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- protected void parseYearMonthDuration(String durationString, DataOutput out) throws HyracksDataException {
- try {
- ADurationParserFactory.parseDuration(durationString, 0, durationString.length(), aYearMonthDuration,
- ADurationParseOption.All);
- yearMonthDurationSerde.serialize(aYearMonthDuration, out);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- protected void parsePoint(String point, DataOutput out) throws HyracksDataException {
- try {
- aPoint.setValue(Double.parseDouble(point.substring(0, point.indexOf(','))),
- Double.parseDouble(point.substring(point.indexOf(',') + 1, point.length())));
- pointSerde.serialize(aPoint, out);
- } catch (HyracksDataException e) {
- throw new HyracksDataException(point + " can not be an instance of point");
- }
- }
-
- protected void parse3DPoint(String point3d, DataOutput out) throws HyracksDataException {
- try {
- int firstCommaIndex = point3d.indexOf(',');
- int secondCommaIndex = point3d.indexOf(',', firstCommaIndex + 1);
- aPoint3D.setValue(Double.parseDouble(point3d.substring(0, firstCommaIndex)),
- Double.parseDouble(point3d.substring(firstCommaIndex + 1, secondCommaIndex)),
- Double.parseDouble(point3d.substring(secondCommaIndex + 1, point3d.length())));
- point3DSerde.serialize(aPoint3D, out);
- } catch (HyracksDataException e) {
- throw new HyracksDataException(point3d + " can not be an instance of point3d");
- }
- }
-
- protected void parseCircle(String circle, DataOutput out) throws HyracksDataException {
- try {
- String[] parts = circle.split(" ");
- aPoint.setValue(Double.parseDouble(parts[0].split(",")[0]), Double.parseDouble(parts[0].split(",")[1]));
- aCircle.setValue(aPoint, Double.parseDouble(parts[1].substring(0, parts[1].length())));
- circleSerde.serialize(aCircle, out);
- } catch (HyracksDataException e) {
- throw new HyracksDataException(circle + " can not be an instance of circle");
- }
- }
-
- protected void parseRectangle(String rectangle, DataOutput out) throws HyracksDataException {
- try {
- String[] points = rectangle.split(" ");
- if (points.length != 2) {
- throw new HyracksDataException("rectangle consists of only 2 points.");
- }
- aPoint.setValue(Double.parseDouble(points[0].split(",")[0]), Double.parseDouble(points[0].split(",")[1]));
- aPoint2.setValue(Double.parseDouble(points[1].split(",")[0]), Double.parseDouble(points[1].split(",")[1]));
- if (aPoint.getX() > aPoint2.getX() && aPoint.getY() > aPoint2.getY()) {
- aRectangle.setValue(aPoint2, aPoint);
- } else if (aPoint.getX() < aPoint2.getX() && aPoint.getY() < aPoint2.getY()) {
- aRectangle.setValue(aPoint, aPoint2);
- } else {
- throw new IllegalArgumentException(
- "Rectangle arugment must be either (bottom left point, top right point) or (top right point, bottom left point)");
- }
- rectangleSerde.serialize(aRectangle, out);
- } catch (HyracksDataException e) {
- throw new HyracksDataException(rectangle + " can not be an instance of rectangle");
- }
- }
-
- protected void parseLine(String line, DataOutput out) throws HyracksDataException {
- try {
- String[] points = line.split(" ");
- if (points.length != 2) {
- throw new HyracksDataException("line consists of only 2 points.");
- }
- aPoint.setValue(Double.parseDouble(points[0].split(",")[0]), Double.parseDouble(points[0].split(",")[1]));
- aPoint2.setValue(Double.parseDouble(points[1].split(",")[0]), Double.parseDouble(points[1].split(",")[1]));
- aLine.setValue(aPoint, aPoint2);
- lineSerde.serialize(aLine, out);
- } catch (HyracksDataException e) {
- throw new HyracksDataException(line + " can not be an instance of line");
- }
- }
-
- protected void parseHexBinaryString(char[] input, int start, int length, DataOutput out)
- throws HyracksDataException {
- hexParser.generateByteArrayFromHexString(input, start, length);
- aBinary.setValue(hexParser.getByteArray(), 0, hexParser.getLength());
- binarySerde.serialize(aBinary, out);
- }
-
- protected void parseBase64BinaryString(char[] input, int start, int length, DataOutput out)
- throws HyracksDataException {
- base64Parser.generatePureByteArrayFromBase64String(input, start, length);
- aBinary.setValue(base64Parser.getByteArray(), 0, base64Parser.getLength());
- binarySerde.serialize(aBinary, out);
- }
-
- protected void parseDateTimeInterval(String interval, DataOutput out) throws HyracksDataException {
- long chrononTimeInMsStart = 0;
- long chrononTimeInMsEnd = 0;
- try {
- // the starting point for parsing (so for the accessor)
- int startOffset = 0;
- int endOffset, timeSeperatorOffsetInDatetimeString;
-
- // Get the index for the comma
- int commaIndex = interval.indexOf(',');
- if (commaIndex < 1) {
- throw new AlgebricksException("comma is missing for a string of interval");
- }
-
- endOffset = commaIndex - 1;
- timeSeperatorOffsetInDatetimeString = interval.indexOf('T');
-
- if (timeSeperatorOffsetInDatetimeString < 0) {
- throw new AlgebricksException(
- "This can not be an instance of interval: missing T for a datetime value.");
- }
-
- chrononTimeInMsStart = parseDatePart(interval, startOffset, timeSeperatorOffsetInDatetimeString - 1);
-
- chrononTimeInMsStart += parseTimePart(interval, timeSeperatorOffsetInDatetimeString + 1, endOffset);
-
- // Interval End
- startOffset = commaIndex + 1;
- endOffset = interval.length() - 1;
-
- timeSeperatorOffsetInDatetimeString = interval.indexOf('T', startOffset);
-
- if (timeSeperatorOffsetInDatetimeString < 0) {
- throw new AlgebricksException(
- "This can not be an instance of interval: missing T for a datetime value.");
- }
-
- chrononTimeInMsEnd = parseDatePart(interval, startOffset, timeSeperatorOffsetInDatetimeString - 1);
-
- chrononTimeInMsEnd += parseTimePart(interval, timeSeperatorOffsetInDatetimeString + 1, endOffset);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
-
- try {
- aInterval.setValue(chrononTimeInMsStart, chrononTimeInMsEnd, ATypeTag.DATETIME.serialize());
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
-
- intervalSerde.serialize(aInterval, out);
- }
-
- protected void parseTimeInterval(String interval, DataOutput out) throws HyracksDataException {
- long chrononTimeInMsStart = 0;
- long chrononTimeInMsEnd = 0;
- try {
- int startOffset = 0;
- int endOffset;
-
- // Get the index for the comma
- int commaIndex = interval.indexOf(',');
- if (commaIndex < 0) {
- throw new AlgebricksException("comma is missing for a string of interval");
- }
-
- endOffset = commaIndex - 1;
- // Interval Start
- chrononTimeInMsStart = parseTimePart(interval, startOffset, endOffset);
-
- if (chrononTimeInMsStart < 0) {
- chrononTimeInMsStart += GregorianCalendarSystem.CHRONON_OF_DAY;
- }
-
- // Interval End
- startOffset = commaIndex + 1;
- endOffset = interval.length() - 1;
-
- chrononTimeInMsEnd = parseTimePart(interval, startOffset, endOffset);
- if (chrononTimeInMsEnd < 0) {
- chrononTimeInMsEnd += GregorianCalendarSystem.CHRONON_OF_DAY;
- }
-
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
-
- try {
- aInterval.setValue(chrononTimeInMsStart, chrononTimeInMsEnd, ATypeTag.TIME.serialize());
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- intervalSerde.serialize(aInterval, out);
- }
-
- protected void parseDateInterval(String interval, DataOutput out) throws HyracksDataException {
- long chrononTimeInMsStart = 0;
- long chrononTimeInMsEnd = 0;
- try {
- // the starting point for parsing (so for the accessor)
- int startOffset = 0;
- int endOffset;
-
- // Get the index for the comma
- int commaIndex = interval.indexOf(',');
- if (commaIndex < 1) {
- throw new AlgebricksException("comma is missing for a string of interval");
- }
-
- endOffset = commaIndex - 1;
- chrononTimeInMsStart = parseDatePart(interval, startOffset, endOffset);
-
- // Interval End
- startOffset = commaIndex + 1;
- endOffset = interval.length() - 1;
-
- chrononTimeInMsEnd = parseDatePart(interval, startOffset, endOffset);
-
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
-
- try {
- aInterval.setValue((chrononTimeInMsStart / GregorianCalendarSystem.CHRONON_OF_DAY),
- (chrononTimeInMsEnd / GregorianCalendarSystem.CHRONON_OF_DAY), ATypeTag.DATE.serialize());
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- intervalSerde.serialize(aInterval, out);
- }
-
- private long parseDatePart(String interval, int startOffset, int endOffset)
- throws AlgebricksException, HyracksDataException {
-
- while (interval.charAt(endOffset) == '"' || interval.charAt(endOffset) == ' ') {
- endOffset--;
- }
-
- while (interval.charAt(startOffset) == '"' || interval.charAt(startOffset) == ' ') {
- startOffset++;
- }
-
- return ADateParserFactory.parseDatePart(interval, startOffset, endOffset - startOffset + 1);
- }
-
- private int parseTimePart(String interval, int startOffset, int endOffset)
- throws AlgebricksException, HyracksDataException {
-
- while (interval.charAt(endOffset) == '"' || interval.charAt(endOffset) == ' ') {
- endOffset--;
- }
-
- while (interval.charAt(startOffset) == '"' || interval.charAt(startOffset) == ' ') {
- startOffset++;
- }
-
- return ATimeParserFactory.parseTimePart(interval, startOffset, endOffset - startOffset + 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
deleted file mode 100644
index f3199e9..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AbstractTupleParser.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-
-/**
- * An abstract class implementation for ITupleParser. It provides common
- * functionality involved in parsing data in an external format and packing
- * frames with formed tuples.
- */
-public abstract class AbstractTupleParser implements ITupleParser {
-
- protected static Logger LOGGER = Logger.getLogger(AbstractTupleParser.class.getName());
-
- protected ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
- protected DataOutput dos = tb.getDataOutput();
- protected final ARecordType recType;
- protected final IHyracksCommonContext ctx;
-
- public AbstractTupleParser(IHyracksCommonContext ctx, ARecordType recType) throws HyracksDataException {
- this.recType = recType;
- this.ctx = ctx;
- }
-
- public abstract IDataParser getDataParser();
-
- public abstract ITupleForwardPolicy getTupleParserPolicy();
-
- @Override
- public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
- IDataParser parser = getDataParser();
- ITupleForwardPolicy policy = getTupleParserPolicy();
- try {
- parser.initialize(in, recType, true);
- policy.initialize(ctx, writer);
- while (true) {
- tb.reset();
- if (!parser.parse(tb.getDataOutput())) {
- break;
- }
- tb.addFieldEndOffset();
- policy.addTuple(tb);
- }
- policy.close();
- } catch (AsterixException ae) {
- throw new HyracksDataException(ae);
- } catch (IOException ioe) {
- throw new HyracksDataException(ioe);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
deleted file mode 100644
index 2053a75..0000000
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/file/AsterixTupleParserFactory.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.file;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.parse.ITupleForwardPolicy;
-import org.apache.asterix.common.parse.ITupleForwardPolicy.TupleForwardPolicyType;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.std.file.ITupleParser;
-import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public class AsterixTupleParserFactory implements ITupleParserFactory {
-
- private static final long serialVersionUID = 1L;
-
- public static enum InputDataFormat {
- ADM,
- DELIMITED,
- UNKNOWN
- }
-
- public static final String HAS_HEADER = "has.header";
- public static final String KEY_FORMAT = "format";
- public static final String FORMAT_ADM = "adm";
- public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
- public static final String FORMAT_BINARY = "binary";
-
- public static final String KEY_PATH = "path";
- public static final String KEY_SOURCE_DATATYPE = "type-name";
- public static final String KEY_DELIMITER = "delimiter";
- public static final String KEY_PARSER_FACTORY = "parser";
- public static final String KEY_HEADER = "header";
- public static final String KEY_QUOTE = "quote";
- public static final String TIME_TRACKING = "time.tracking";
- public static final String DEFAULT_QUOTE = "\"";
- public static final String AT_LEAST_ONE_SEMANTICS = FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS;
- public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
- public static final String DEFAULT_DELIMITER = ",";
-
- private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
-
- private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
- Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
- m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
- m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
- m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
- m.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
- m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
- return m;
- }
-
- private final ARecordType recordType;
- private final Map<String, String> configuration;
- private final InputDataFormat inputDataFormat;
-
- public AsterixTupleParserFactory(Map<String, String> configuration, ARecordType recType, InputDataFormat dataFormat) {
- this.recordType = recType;
- this.configuration = configuration;
- this.inputDataFormat = dataFormat;
- }
-
- @Override
- public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException {
- ITupleParser tupleParser = null;
- try {
- String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
- ITupleParserFactory parserFactory = null;
- if (parserFactoryClassname != null) {
- parserFactory = (ITupleParserFactory) Class.forName(parserFactoryClassname).newInstance();
- tupleParser = parserFactory.createTupleParser(ctx);
- } else {
- IDataParser dataParser = null;
- dataParser = createDataParser(ctx);
- ITupleForwardPolicy policy = getTupleParserPolicy(configuration);
- policy.configure(configuration);
- tupleParser = new GenericTupleParser(ctx, recordType, dataParser, policy);
- }
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- return tupleParser;
- }
-
- private static class GenericTupleParser extends AbstractTupleParser {
-
- private final IDataParser dataParser;
-
- private final ITupleForwardPolicy policy;
-
- public GenericTupleParser(IHyracksCommonContext ctx, ARecordType recType, IDataParser dataParser,
- ITupleForwardPolicy policy) throws HyracksDataException {
- super(ctx, recType);
- this.dataParser = dataParser;
- this.policy = policy;
- }
-
- @Override
- public IDataParser getDataParser() {
- return dataParser;
- }
-
- @Override
- public ITupleForwardPolicy getTupleParserPolicy() {
- return policy;
- }
-
- }
-
- private IDataParser createDataParser(IHyracksCommonContext ctx) throws Exception {
- IDataParser dataParser = null;
- switch (inputDataFormat) {
- case ADM:
- dataParser = new ADMDataParser();
- break;
- case DELIMITED:
- dataParser = configureDelimitedDataParser(ctx);
- break;
- case UNKNOWN:
- String specifiedFormat = (String) configuration.get(KEY_FORMAT);
- if (specifiedFormat == null) {
- throw new IllegalArgumentException(" Unspecified data format");
- } else {
- if (FORMAT_ADM.equalsIgnoreCase(specifiedFormat.toUpperCase())) {
- dataParser = new ADMDataParser();
- } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat.toUpperCase())) {
- dataParser = configureDelimitedDataParser(ctx);
- } else {
- throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT)
- + " not supported");
- }
- }
- }
- return dataParser;
- }
-
- public static ITupleForwardPolicy getTupleParserPolicy(Map<String, String> configuration) {
- ITupleForwardPolicy policy = null;
- ITupleForwardPolicy.TupleForwardPolicyType policyType = null;
- String propValue = configuration.get(ITupleForwardPolicy.PARSER_POLICY);
- if (propValue == null) {
- policyType = TupleForwardPolicyType.FRAME_FULL;
- } else {
- policyType = TupleForwardPolicyType.valueOf(propValue.trim().toUpperCase());
- }
- switch (policyType) {
- case FRAME_FULL:
- policy = new FrameFullTupleForwardPolicy();
- break;
- case COUNTER_TIMER_EXPIRED:
- policy = new CounterTimerTupleForwardPolicy();
- break;
- case RATE_CONTROLLED:
- policy = new RateControlledTupleForwardPolicy();
- break;
- }
- return policy;
- }
-
- private IDataParser configureDelimitedDataParser(IHyracksCommonContext ctx) throws AsterixException {
- IValueParserFactory[] valueParserFactories = getValueParserFactories();
- Character delimiter = getDelimiter(configuration);
- char quote = getQuote(configuration, delimiter);
- boolean hasHeader = hasHeader();
- return new DelimitedDataParser(recordType, valueParserFactories, delimiter, quote, hasHeader);
- }
-
-
- private boolean hasHeader() {
- String value = configuration.get(KEY_HEADER);
- if (value != null) {
- return Boolean.valueOf(value);
- }
- return false;
- }
-
- private IValueParserFactory[] getValueParserFactories() {
- int n = recordType.getFieldTypes().length;
- IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
- for (int i = 0; i < n; i++) {
- ATypeTag tag = null;
- if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
- List<IAType> unionTypes = ((AUnionType) recordType.getFieldTypes()[i]).getUnionList();
- if (unionTypes.size() != 2 && unionTypes.get(0).getTypeTag() != ATypeTag.NULL) {
- throw new NotImplementedException("Non-optional UNION type is not supported.");
- }
- tag = unionTypes.get(1).getTypeTag();
- } else {
- tag = recordType.getFieldTypes()[i].getTypeTag();
- }
- if (tag == null) {
- throw new NotImplementedException("Failed to get the type information for field " + i + ".");
- }
- IValueParserFactory vpf = valueParserFactoryMap.get(tag);
- if (vpf == null) {
- throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
- }
- fieldParserFactories[i] = vpf;
- }
- return fieldParserFactories;
- }
-
- // Get a delimiter from the given configuration
- public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
- String delimiterValue = configuration.get(AsterixTupleParserFactory.KEY_DELIMITER);
- if (delimiterValue == null) {
- delimiterValue = AsterixTupleParserFactory.DEFAULT_DELIMITER;
- } else if (delimiterValue.length() != 1) {
- throw new AsterixException("'" + delimiterValue
- + "' is not a valid delimiter. The length of a delimiter should be 1.");
- }
- return delimiterValue.charAt(0);
- }
-
- // Get a quote from the given configuration when the delimiter is given
- // Need to pass delimiter to check whether they share the same character
- public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
- String quoteValue = configuration.get(AsterixTupleParserFactory.KEY_QUOTE);
- if (quoteValue == null) {
- quoteValue = AsterixTupleParserFactory.DEFAULT_QUOTE;
- } else if (quoteValue.length() != 1) {
- throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
- }
-
- // Since delimiter (char type value) can't be null,
- // we only check whether delimiter and quote use the same character
- if (quoteValue.charAt(0) == delimiter) {
- throw new AsterixException("Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter
- + "'. ");
- }
-
- return quoteValue.charAt(0);
- }
-
- // Get the header flag
- public static boolean getHasHeader(Map<String, String> configuration) {
- return Boolean.parseBoolean(configuration.get(AsterixTupleParserFactory.KEY_HEADER));
- }
-
-}