You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/03 18:41:07 UTC
[09/21] incubator-asterixdb git commit: First stage of external data
cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
new file mode 100644
index 0000000..146064a
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Map;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.dataflow.data.nontagged.serde.ANullSerializerDeserializer;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import org.apache.hyracks.dataflow.std.file.FieldCursorForDelimitedDataParser;
+
+public class DelimitedDataParser extends AbstractDataParser implements IStreamDataParser, IRecordDataParser<char[]> {
+
+ private final IValueParserFactory[] valueParserFactories;
+ private final char fieldDelimiter;
+ private final char quote;
+ private final boolean hasHeader;
+ private ARecordType recordType;
+ private IARecordBuilder recBuilder;
+ private ArrayBackedValueStorage fieldValueBuffer;
+ private DataOutput fieldValueBufferOutput;
+ private IValueParser[] valueParsers;
+ private FieldCursorForDelimitedDataParser cursor;
+ private byte[] fieldTypeTags;
+ private int[] fldIds;
+ private ArrayBackedValueStorage[] nameBuffers;
+ private boolean areAllNullFields;
+ private boolean isStreamParser = true;
+
+ public DelimitedDataParser(IValueParserFactory[] valueParserFactories, char fieldDelimter, char quote,
+ boolean hasHeader) {
+ this.valueParserFactories = valueParserFactories;
+ this.fieldDelimiter = fieldDelimter;
+ this.quote = quote;
+ this.hasHeader = hasHeader;
+ }
+
+ @Override
+ public boolean parse(DataOutput out) throws AsterixException, IOException {
+ while (cursor.nextRecord()) {
+ parseRecord(out);
+ if (!areAllNullFields) {
+ recBuilder.write(out, true);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void parseRecord(DataOutput out) throws AsterixException, IOException {
+ recBuilder.reset(recordType);
+ recBuilder.init();
+ areAllNullFields = true;
+
+ for (int i = 0; i < valueParsers.length; ++i) {
+ if (!cursor.nextField()) {
+ break;
+ }
+ fieldValueBuffer.reset();
+
+ if (cursor.fStart == cursor.fEnd && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.STRING
+ && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.NULL) {
+ // if the field is empty and the type is optional, insert
+ // NULL. Note that string type can also process empty field as an
+ // empty string
+ if (!NonTaggedFormatUtil.isOptional(recordType.getFieldTypes()[i])) {
+ throw new AsterixException("At record: " + cursor.recordCount + " - Field " + cursor.fieldCount
+ + " is not an optional type so it cannot accept null value. ");
+ }
+ fieldValueBufferOutput.writeByte(ATypeTag.NULL.serialize());
+ ANullSerializerDeserializer.INSTANCE.serialize(ANull.NULL, out);
+ } else {
+ fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
+ // Eliminate doule quotes in the field that we are going to parse
+ if (cursor.isDoubleQuoteIncludedInThisField) {
+ cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+ cursor.fEnd -= cursor.doubleQuoteCount;
+ cursor.isDoubleQuoteIncludedInThisField = false;
+ }
+ valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart,
+ fieldValueBufferOutput);
+ areAllNullFields = false;
+ }
+ if (fldIds[i] < 0) {
+ recBuilder.addField(nameBuffers[i], fieldValueBuffer);
+ } else {
+ recBuilder.addField(fldIds[i], fieldValueBuffer);
+ }
+ }
+ }
+
+ protected void fieldNameToBytes(String fieldName, AMutableString str, ArrayBackedValueStorage buffer)
+ throws HyracksDataException {
+ buffer.reset();
+ DataOutput out = buffer.getDataOutput();
+ str.setValue(fieldName);
+ try {
+ stringSerde.serialize(str, out);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return isStreamParser ? DataSourceType.STREAM : DataSourceType.RECORDS;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType recordType) throws HyracksDataException {
+ this.recordType = recordType;
+ valueParsers = new IValueParser[valueParserFactories.length];
+ for (int i = 0; i < valueParserFactories.length; ++i) {
+ valueParsers[i] = valueParserFactories[i].createValueParser();
+ }
+
+ fieldValueBuffer = new ArrayBackedValueStorage();
+ fieldValueBufferOutput = fieldValueBuffer.getDataOutput();
+ recBuilder = new RecordBuilder();
+ recBuilder.reset(recordType);
+ recBuilder.init();
+
+ int n = recordType.getFieldNames().length;
+ fieldTypeTags = new byte[n];
+ for (int i = 0; i < n; i++) {
+ ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+ fieldTypeTags[i] = tag.serialize();
+ }
+
+ fldIds = new int[n];
+ nameBuffers = new ArrayBackedValueStorage[n];
+ AMutableString str = new AMutableString(null);
+ for (int i = 0; i < n; i++) {
+ String name = recordType.getFieldNames()[i];
+ fldIds[i] = recBuilder.getFieldId(name);
+ if (fldIds[i] < 0) {
+ if (!recordType.isOpen()) {
+ throw new HyracksDataException("Illegal field " + name + " in closed type " + recordType);
+ } else {
+ nameBuffers[i] = new ArrayBackedValueStorage();
+ fieldNameToBytes(name, str, nameBuffers[i]);
+ }
+ }
+ }
+ isStreamParser = ExternalDataUtils.isDataSourceStreamProvider(configuration);
+ if (!isStreamParser) {
+ cursor = new FieldCursorForDelimitedDataParser(null, fieldDelimiter, quote);
+ }
+ }
+
+ @Override
+ public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws Exception {
+ cursor.nextRecord(record.get(), record.size());
+ parseRecord(out);
+ if (!areAllNullFields) {
+ recBuilder.write(out, true);
+ }
+ }
+
+ @Override
+ public Class<? extends char[]> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ public void setInputStream(InputStream in) throws Exception {
+ cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
+ if (in != null && hasHeader) {
+ cursor.nextRecord();
+ while (cursor.nextField());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java
new file mode 100644
index 0000000..fb61339
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/HiveRecordParser.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.builders.UnorderedListBuilder;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+@SuppressWarnings("deprecation")
+public class HiveRecordParser implements IRecordDataParser<Writable> {
+
+ private ARecordType aRecord;
+ private SerDe hiveSerde;
+ private StructObjectInspector oi;
+ private IARecordBuilder recBuilder;
+ private ArrayBackedValueStorage fieldValueBuffer;
+ private ArrayBackedValueStorage listItemBuffer;
+ private byte[] fieldTypeTags;
+ private IAType[] fieldTypes;
+ private OrderedListBuilder orderedListBuilder;
+ private UnorderedListBuilder unorderedListBuilder;
+ private List<? extends StructField> fieldRefs;
+ private UTF8StringWriter utf8Writer = new UTF8StringWriter();
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType recordType) throws HyracksDataException {
+ try {
+ this.aRecord = recordType;
+ int n = aRecord.getFieldNames().length;
+ fieldTypes = aRecord.getFieldTypes();
+ JobConf hadoopConfiguration = HDFSUtils.configureHDFSJobConf(configuration);
+ //create the hive table schema.
+ Properties tbl = new Properties();
+ tbl.put(Constants.LIST_COLUMNS, getCommaDelimitedColNames(aRecord));
+ tbl.put(Constants.LIST_COLUMN_TYPES, getColTypes(aRecord));
+ String hiveSerdeClassName = configuration.get(ExternalDataConstants.KEY_HIVE_SERDE);
+ if (hiveSerdeClassName == null) {
+ throw new IllegalArgumentException("no hive serde provided for hive deserialized records");
+ }
+ hiveSerde = (SerDe) Class.forName(hiveSerdeClassName).newInstance();
+ hiveSerde.initialize(hadoopConfiguration, tbl);
+ oi = (StructObjectInspector) hiveSerde.getObjectInspector();
+
+ fieldValueBuffer = new ArrayBackedValueStorage();
+ recBuilder = new RecordBuilder();
+ recBuilder.reset(aRecord);
+ recBuilder.init();
+ fieldTypeTags = new byte[n];
+ for (int i = 0; i < n; i++) {
+ ATypeTag tag = aRecord.getFieldTypes()[i].getTypeTag();
+ fieldTypeTags[i] = tag.serialize();
+ }
+ fieldRefs = oi.getAllStructFieldRefs();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void parse(IRawRecord<? extends Writable> record, DataOutput out) throws Exception {
+ Writable hiveRawRecord = record.get();
+ Object hiveObject = hiveSerde.deserialize(hiveRawRecord);
+ int n = aRecord.getFieldNames().length;
+ List<Object> attributesValues = oi.getStructFieldsDataAsList(hiveObject);
+ recBuilder.reset(aRecord);
+ recBuilder.init();
+ for (int i = 0; i < n; i++) {
+ final Object value = attributesValues.get(i);
+ final ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
+ fieldValueBuffer.reset();
+ final DataOutput dataOutput = fieldValueBuffer.getDataOutput();
+ dataOutput.writeByte(fieldTypeTags[i]);
+ //get field type
+ parseItem(fieldTypes[i], value, foi, dataOutput, false);
+ recBuilder.addField(i, fieldValueBuffer);
+ }
+ recBuilder.write(out, true);
+
+ }
+
+ private void parseItem(IAType itemType, Object value, ObjectInspector foi, DataOutput dataOutput,
+ boolean primitiveOnly) throws IOException {
+ switch (itemType.getTypeTag()) {
+ case BOOLEAN:
+ parseBoolean(value, (BooleanObjectInspector) foi, dataOutput);
+ break;
+ case TIME:
+ parseTime(value, (TimestampObjectInspector) foi, dataOutput);
+ break;
+ case DATE:
+ parseDate(value, (TimestampObjectInspector) foi, dataOutput);
+ break;
+ case DATETIME:
+ parseDateTime(value, (TimestampObjectInspector) foi, dataOutput);
+ break;
+ case DOUBLE:
+ parseDouble(value, (DoubleObjectInspector) foi, dataOutput);
+ break;
+ case FLOAT:
+ parseFloat(value, (FloatObjectInspector) foi, dataOutput);
+ break;
+ case INT8:
+ parseInt8(value, (ByteObjectInspector) foi, dataOutput);
+ break;
+ case INT16:
+ parseInt16(value, (ShortObjectInspector) foi, dataOutput);
+ break;
+ case INT32:
+ parseInt32(value, (IntObjectInspector) foi, dataOutput);
+ break;
+ case INT64:
+ parseInt64(value, (LongObjectInspector) foi, dataOutput);
+ break;
+ case STRING:
+ parseString(value, (StringObjectInspector) foi, dataOutput);
+ break;
+ case ORDEREDLIST:
+ if (primitiveOnly) {
+ throw new HyracksDataException("doesn't support hive data with list of non-primitive types");
+ }
+ parseOrderedList((AOrderedListType) itemType, value, (ListObjectInspector) foi);
+ break;
+ case UNORDEREDLIST:
+ if (primitiveOnly) {
+ throw new HyracksDataException("doesn't support hive data with list of non-primitive types");
+ }
+ parseUnorderedList((AUnorderedListType) itemType, value, (ListObjectInspector) foi);
+ break;
+ default:
+ throw new HyracksDataException("Can't get hive type for field of type " + itemType.getTypeTag());
+ }
+ }
+
+ @Override
+ public Class<? extends Writable> getRecordClass() {
+ return Writable.class;
+ }
+
+ private Object getColTypes(ARecordType record) throws Exception {
+ int n = record.getFieldTypes().length;
+ if (n < 1) {
+ throw new HyracksDataException("Failed to get columns of record");
+ }
+ //First Column
+ String cols = getHiveTypeString(record.getFieldTypes(), 0);
+ for (int i = 1; i < n; i++) {
+ cols = cols + "," + getHiveTypeString(record.getFieldTypes(), i);
+ }
+ return cols;
+ }
+
+ private String getCommaDelimitedColNames(ARecordType record) throws Exception {
+ if (record.getFieldNames().length < 1) {
+ throw new HyracksDataException("Can't deserialize hive records with no closed columns");
+ }
+
+ String cols = record.getFieldNames()[0];
+ for (int i = 1; i < record.getFieldNames().length; i++) {
+ cols = cols + "," + record.getFieldNames()[i];
+ }
+ return cols;
+ }
+
+ private String getHiveTypeString(IAType[] types, int i) throws Exception {
+ final IAType type = types[i];
+ ATypeTag tag = type.getTypeTag();
+ if (tag == ATypeTag.UNION) {
+ if (NonTaggedFormatUtil.isOptional(type)) {
+ throw new NotImplementedException("Non-optional UNION type is not supported.");
+ }
+ tag = ((AUnionType) type).getNullableType().getTypeTag();
+ }
+ if (tag == null) {
+ throw new NotImplementedException("Failed to get the type information for field " + i + ".");
+ }
+ switch (tag) {
+ case BOOLEAN:
+ return Constants.BOOLEAN_TYPE_NAME;
+ case DATE:
+ return Constants.DATE_TYPE_NAME;
+ case DATETIME:
+ return Constants.DATETIME_TYPE_NAME;
+ case DOUBLE:
+ return Constants.DOUBLE_TYPE_NAME;
+ case FLOAT:
+ return Constants.FLOAT_TYPE_NAME;
+ case INT16:
+ return Constants.SMALLINT_TYPE_NAME;
+ case INT32:
+ return Constants.INT_TYPE_NAME;
+ case INT64:
+ return Constants.BIGINT_TYPE_NAME;
+ case INT8:
+ return Constants.TINYINT_TYPE_NAME;
+ case ORDEREDLIST:
+ return Constants.LIST_TYPE_NAME;
+ case STRING:
+ return Constants.STRING_TYPE_NAME;
+ case TIME:
+ return Constants.DATETIME_TYPE_NAME;
+ case UNORDEREDLIST:
+ return Constants.LIST_TYPE_NAME;
+ default:
+ throw new HyracksDataException("Can't get hive type for field of type " + tag);
+ }
+ }
+
+ private void parseInt64(Object obj, LongObjectInspector foi, DataOutput dataOutput) throws IOException {
+ dataOutput.writeLong(foi.get(obj));
+ }
+
+ private void parseInt32(Object obj, IntObjectInspector foi, DataOutput dataOutput) throws IOException {
+ if (obj == null) {
+ throw new HyracksDataException("can't parse null field");
+ }
+ dataOutput.writeInt(foi.get(obj));
+ }
+
+ private void parseInt16(Object obj, ShortObjectInspector foi, DataOutput dataOutput) throws IOException {
+ dataOutput.writeShort(foi.get(obj));
+ }
+
+ private void parseFloat(Object obj, FloatObjectInspector foi, DataOutput dataOutput) throws IOException {
+ dataOutput.writeFloat(foi.get(obj));
+ }
+
+ private void parseDouble(Object obj, DoubleObjectInspector foi, DataOutput dataOutput) throws IOException {
+ dataOutput.writeDouble(foi.get(obj));
+ }
+
+ private void parseDateTime(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
+ dataOutput.writeLong(foi.getPrimitiveJavaObject(obj).getTime());
+ }
+
+ private void parseDate(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
+ long chrononTimeInMs = foi.getPrimitiveJavaObject(obj).getTime();
+ short temp = 0;
+ if (chrononTimeInMs < 0 && chrononTimeInMs % GregorianCalendarSystem.CHRONON_OF_DAY != 0) {
+ temp = 1;
+ }
+ dataOutput.writeInt((int) (chrononTimeInMs / GregorianCalendarSystem.CHRONON_OF_DAY) - temp);
+ }
+
+ private void parseBoolean(Object obj, BooleanObjectInspector foi, DataOutput dataOutput) throws IOException {
+ dataOutput.writeBoolean(foi.get(obj));
+ }
+
+ private void parseInt8(Object obj, ByteObjectInspector foi, DataOutput dataOutput) throws IOException {
+ dataOutput.writeByte(foi.get(obj));
+ }
+
+ private void parseString(Object obj, StringObjectInspector foi, DataOutput dataOutput) throws IOException {
+ utf8Writer.writeUTF8(foi.getPrimitiveJavaObject(obj), dataOutput);
+ }
+
+ private void parseTime(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
+ dataOutput.writeInt((int) (foi.getPrimitiveJavaObject(obj).getTime() % 86400000));
+ }
+
+ private void parseOrderedList(AOrderedListType aOrderedListType, Object obj, ListObjectInspector foi)
+ throws IOException {
+ OrderedListBuilder orderedListBuilder = getOrderedListBuilder();
+ IAType itemType = null;
+ if (aOrderedListType != null)
+ itemType = aOrderedListType.getItemType();
+ orderedListBuilder.reset(aOrderedListType);
+
+ int n = foi.getListLength(obj);
+ for (int i = 0; i < n; i++) {
+ Object element = foi.getListElement(obj, i);
+ ObjectInspector eoi = foi.getListElementObjectInspector();
+ if (element == null) {
+ throw new HyracksDataException("can't parse hive list with null values");
+ }
+ parseItem(itemType, element, eoi, listItemBuffer.getDataOutput(), true);
+ orderedListBuilder.addItem(listItemBuffer);
+ }
+ orderedListBuilder.write(fieldValueBuffer.getDataOutput(), true);
+ }
+
+ private void parseUnorderedList(AUnorderedListType uoltype, Object obj, ListObjectInspector oi) throws IOException {
+ UnorderedListBuilder unorderedListBuilder = getUnorderedListBuilder();
+ IAType itemType = null;
+ if (uoltype != null)
+ itemType = uoltype.getItemType();
+ byte tagByte = itemType.getTypeTag().serialize();
+ unorderedListBuilder.reset(uoltype);
+
+ int n = oi.getListLength(obj);
+ for (int i = 0; i < n; i++) {
+ Object element = oi.getListElement(obj, i);
+ ObjectInspector eoi = oi.getListElementObjectInspector();
+ if (element == null) {
+ throw new HyracksDataException("can't parse hive list with null values");
+ }
+ listItemBuffer.reset();
+ final DataOutput dataOutput = listItemBuffer.getDataOutput();
+ dataOutput.writeByte(tagByte);
+ parseItem(itemType, element, eoi, dataOutput, true);
+ unorderedListBuilder.addItem(listItemBuffer);
+ }
+ unorderedListBuilder.write(fieldValueBuffer.getDataOutput(), true);
+ }
+
+ private OrderedListBuilder getOrderedListBuilder() {
+ if (orderedListBuilder != null)
+ return orderedListBuilder;
+ else {
+ orderedListBuilder = new OrderedListBuilder();
+ return orderedListBuilder;
+ }
+ }
+
+ private UnorderedListBuilder getUnorderedListBuilder() {
+ if (unorderedListBuilder != null)
+ return unorderedListBuilder;
+ else {
+ unorderedListBuilder = new UnorderedListBuilder();
+ return unorderedListBuilder;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java
new file mode 100644
index 0000000..4d93dc5
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RSSParser.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.om.base.AMutableRecord;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+
+public class RSSParser implements IRecordDataParser<SyndEntryImpl> {
+ private long id = 0;
+ private String idPrefix;
+ private AMutableString[] mutableFields;
+ private String[] tupleFieldValues;
+ private AMutableRecord mutableRecord;
+ private RecordBuilder recordBuilder = new RecordBuilder();
+ private int numFields;
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType recordType)
+ throws HyracksDataException, IOException {
+ mutableFields = new AMutableString[] { new AMutableString(null), new AMutableString(null),
+ new AMutableString(null), new AMutableString(null) };
+ mutableRecord = new AMutableRecord(recordType, mutableFields);
+ tupleFieldValues = new String[recordType.getFieldNames().length];
+ numFields = recordType.getFieldNames().length;
+ }
+
+ @Override
+ public void parse(IRawRecord<? extends SyndEntryImpl> record, DataOutput out) throws Exception {
+ SyndEntryImpl entry = record.get();
+ tupleFieldValues[0] = idPrefix + ":" + id;
+ tupleFieldValues[1] = entry.getTitle();
+ tupleFieldValues[2] = entry.getDescription().getValue();
+ tupleFieldValues[3] = entry.getLink();
+ for (int i = 0; i < numFields; i++) {
+ mutableFields[i].setValue(tupleFieldValues[i]);
+ mutableRecord.setValueAtPos(i, mutableFields[i]);
+ }
+ recordBuilder.reset(mutableRecord.getType());
+ recordBuilder.init();
+ IDataParser.writeRecord(mutableRecord, out, recordBuilder);
+ id++;
+ }
+
+ @Override
+ public Class<? extends SyndEntryImpl> getRecordClass() {
+ return SyndEntryImpl.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
new file mode 100644
index 0000000..b9cd60b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.library.java.JObjectUtil;
+import org.apache.asterix.external.util.Datatypes.Tweet;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableRecord;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import twitter4j.Status;
+import twitter4j.User;
+
+public class TweetParser implements IRecordDataParser<Status> {
+
+ private IAObject[] mutableTweetFields;
+ private IAObject[] mutableUserFields;
+ private AMutableRecord mutableRecord;
+ private AMutableRecord mutableUser;
+ private final Map<String, Integer> userFieldNameMap = new HashMap<>();
+ private final Map<String, Integer> tweetFieldNameMap = new HashMap<>();
+ private RecordBuilder recordBuilder = new RecordBuilder();
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType recordType)
+ throws HyracksDataException, IOException {
+ initFieldNames(recordType);
+ mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0),
+ new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) };
+ mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)],
+ mutableUserFields);
+
+ mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0),
+ new AMutableDouble(0), new AMutableString(null), new AMutableString(null) };
+ mutableRecord = new AMutableRecord(recordType, mutableTweetFields);
+
+ }
+
+ // Initialize the hashmap values for the field names and positions
+ private void initFieldNames(ARecordType recordType) {
+ String tweetFields[] = recordType.getFieldNames();
+ for (int i = 0; i < tweetFields.length; i++) {
+ tweetFieldNameMap.put(tweetFields[i], i);
+ if (tweetFields[i].equals(Tweet.USER)) {
+ IAType fieldType = recordType.getFieldTypes()[i];
+ if (fieldType.getTypeTag() == ATypeTag.RECORD) {
+ String userFields[] = ((ARecordType) fieldType).getFieldNames();
+ for (int j = 0; j < userFields.length; j++) {
+ userFieldNameMap.put(userFields[j], j);
+ }
+ }
+
+ }
+ }
+ }
+
+ @Override
+ public void parse(IRawRecord<? extends Status> record, DataOutput out) throws Exception {
+ Status tweet = record.get();
+ User user = tweet.getUser();
+ // Tweet user data
+ ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)])
+ .setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
+ ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)])
+ .setValue(JObjectUtil.getNormalizedString(user.getLang()));
+ ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount());
+ ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount());
+ ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)])
+ .setValue(JObjectUtil.getNormalizedString(user.getName()));
+ ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)])
+ .setValue(user.getFollowersCount());
+
+ // Tweet data
+ ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId()));
+
+ int userPos = tweetFieldNameMap.get(Tweet.USER);
+ for (int i = 0; i < mutableUserFields.length; i++) {
+ ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i, mutableUserFields[i]);
+ }
+ if (tweet.getGeoLocation() != null) {
+ ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)])
+ .setValue(tweet.getGeoLocation().getLatitude());
+ ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)])
+ .setValue(tweet.getGeoLocation().getLongitude());
+ } else {
+ ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0);
+ ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0);
+ }
+ ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)])
+ .setValue(JObjectUtil.getNormalizedString(tweet.getCreatedAt().toString()));
+ ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)])
+ .setValue(JObjectUtil.getNormalizedString(tweet.getText()));
+
+ for (int i = 0; i < mutableTweetFields.length; i++) {
+ mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
+ }
+ recordBuilder.reset(mutableRecord.getType());
+ recordBuilder.init();
+ IDataParser.writeRecord(mutableRecord, out, recordBuilder);
+ }
+
+ @Override
+ public Class<? extends Status> getRecordClass() {
+ return Status.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
new file mode 100644
index 0000000..4634278
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.ADMDataParser;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ADMDataParserFactory extends AbstractRecordStreamParserFactory<char[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
+ return createParser();
+ }
+
+ private ADMDataParser createParser() throws HyracksDataException {
+ try {
+ ADMDataParser parser = new ADMDataParser();
+ parser.configure(configuration, recordType);
+ return parser;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public Class<? extends char[]> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ return createParser();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AbstractRecordStreamParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AbstractRecordStreamParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AbstractRecordStreamParserFactory.java
new file mode 100644
index 0000000..43af455
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AbstractRecordStreamParserFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.api.IStreamDataParserFactory;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.types.ARecordType;
+
+public abstract class AbstractRecordStreamParserFactory<T>
+ implements IStreamDataParserFactory, IRecordDataParserFactory<T> {
+
+ private static final long serialVersionUID = 1L;
+ protected ARecordType recordType;
+ protected Map<String, String> configuration;
+
+ @Override
+ public DataSourceType getDataSourceType() throws AsterixException {
+ return ExternalDataUtils.getDataSourceType(configuration);
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void setRecordType(ARecordType recordType) {
+ this.recordType = recordType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
new file mode 100644
index 0000000..fa63d45
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.DelimitedDataParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class DelimitedDataParserFactory extends AbstractRecordStreamParserFactory<char[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx)
+ throws HyracksDataException, AsterixException {
+ return createParser();
+ }
+
+ private DelimitedDataParser createParser() throws HyracksDataException, AsterixException {
+ IValueParserFactory[] valueParserFactories = ExternalDataUtils.getValueParserFactories(recordType);
+ Character delimiter = DelimitedDataParserFactory.getDelimiter(configuration);
+ char quote = DelimitedDataParserFactory.getQuote(configuration, delimiter);
+ boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
+ DelimitedDataParser parser = new DelimitedDataParser(valueParserFactories, delimiter, quote, hasHeader);
+ parser.configure(configuration, recordType);
+ return parser;
+ }
+
+ @Override
+ public Class<? extends char[]> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException, AsterixException {
+ return createParser();
+ }
+
+ // Get a delimiter from the given configuration
+ public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
+ String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
+ if (delimiterValue == null) {
+ delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
+ } else if (delimiterValue.length() != 1) {
+ throw new AsterixException(
+ "'" + delimiterValue + "' is not a valid delimiter. The length of a delimiter should be 1.");
+ }
+ return delimiterValue.charAt(0);
+ }
+
+ // Get a quote from the given configuration when the delimiter is given
+ // Need to pass delimiter to check whether they share the same character
+ public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
+ String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
+ if (quoteValue == null) {
+ quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
+ } else if (quoteValue.length() != 1) {
+ throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
+ }
+
+ // Since delimiter (char type value) can't be null,
+ // we only check whether delimiter and quote use the same character
+ if (quoteValue.charAt(0) == delimiter) {
+ throw new AsterixException(
+ "Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter + "'. ");
+ }
+
+ return quoteValue.charAt(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java
new file mode 100644
index 0000000..f07ba4c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.parser.HiveRecordParser;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class HiveDataParserFactory implements IRecordDataParserFactory<Writable> {
+
+ private static final long serialVersionUID = 1L;
+ private Map<String, String> configuration;
+ private ARecordType recordType;
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void setRecordType(ARecordType recordType) {
+ this.recordType = recordType;
+ }
+
+ @Override
+ public IRecordDataParser<Writable> createRecordParser(IHyracksTaskContext ctx)
+ throws HyracksDataException, AsterixException {
+ HiveRecordParser hiveParser = new HiveRecordParser();
+ hiveParser.configure(configuration, recordType);
+ return hiveParser;
+ }
+
+ @Override
+ public Class<? extends Writable> getRecordClass() {
+ return Writable.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
new file mode 100644
index 0000000..fecb0de
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.parser.RSSParser;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+
+public class RSSParserFactory implements IRecordDataParserFactory<SyndEntryImpl> {
+
+ private static final long serialVersionUID = 1L;
+ private ARecordType recordType;
+ private Map<String, String> configuration;
+
+ @Override
+ public DataSourceType getDataSourceType() throws AsterixException {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void setRecordType(ARecordType recordType) {
+ this.recordType = recordType;
+ }
+
+ @Override
+ public IRecordDataParser<SyndEntryImpl> createRecordParser(IHyracksTaskContext ctx)
+ throws AsterixException, IOException {
+ RSSParser dataParser = new RSSParser();
+ dataParser.configure(configuration, recordType);
+ return dataParser;
+ }
+
+ @Override
+ public Class<? extends SyndEntryImpl> getRecordClass() {
+ return SyndEntryImpl.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
new file mode 100644
index 0000000..0f3b309
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.parser.TweetParser;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import twitter4j.Status;
+
+public class TweetParserFactory implements IRecordDataParserFactory<Status> {
+
+ private static final long serialVersionUID = 1L;
+ private ARecordType recordType;
+ private Map<String, String> configuration;
+
+ @Override
+ public DataSourceType getDataSourceType() throws AsterixException {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void setRecordType(ARecordType recordType) {
+ this.recordType = recordType;
+ }
+
+ @Override
+ public IRecordDataParser<Status> createRecordParser(IHyracksTaskContext ctx) throws AsterixException, IOException {
+ TweetParser dataParser = new TweetParser();
+ dataParser.configure(configuration, recordType);
+ return dataParser;
+ }
+
+ @Override
+ public Class<? extends Status> getRecordClass() {
+ return Status.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
new file mode 100644
index 0000000..649ca43
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
+import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IIndexingAdapterFactory;
+import org.apache.asterix.external.dataset.adapter.GenericAdapter;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.external.runtime.GenericSocketFeedAdapter;
+import org.apache.asterix.external.runtime.GenericSocketFeedAdapterFactory;
+import org.apache.asterix.external.runtime.SocketClientAdapter;
+import org.apache.asterix.external.runtime.SocketClientAdapterFactory;
+import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
+
+public class AdapterFactoryProvider {
+
+ public static final Map<String, Class<? extends IAdapterFactory>> adapterFactories = initializeAdapterFactoryMapping();
+
+ private static Map<String, Class<? extends IAdapterFactory>> initializeAdapterFactoryMapping() {
+ Map<String, Class<? extends IAdapterFactory>> adapterFactories = new HashMap<String, Class<? extends IAdapterFactory>>();
+ // Class names
+ adapterFactories.put(GenericAdapter.class.getName(), GenericAdapterFactory.class);
+ adapterFactories.put(GenericSocketFeedAdapter.class.getName(), GenericSocketFeedAdapterFactory.class);
+ adapterFactories.put(SocketClientAdapter.class.getName(), SocketClientAdapterFactory.class);
+
+ // Aliases
+ adapterFactories.put(ExternalDataConstants.ALIAS_GENERIC_ADAPTER, GenericAdapterFactory.class);
+ adapterFactories.put(ExternalDataConstants.ALIAS_HDFS_ADAPTER, GenericAdapterFactory.class);
+ adapterFactories.put(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, GenericAdapterFactory.class);
+ adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_ADAPTER, GenericSocketFeedAdapterFactory.class);
+ adapterFactories.put(ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER, SocketClientAdapterFactory.class);
+ adapterFactories.put(ExternalDataConstants.ALIAS_FILE_FEED_ADAPTER, GenericAdapterFactory.class);
+
+ // Compatability
+ adapterFactories.put(ExternalDataConstants.ADAPTER_HDFS_CLASSNAME, GenericAdapterFactory.class);
+ adapterFactories.put(ExternalDataConstants.ADAPTER_LOCALFS_CLASSNAME, GenericAdapterFactory.class);
+ return adapterFactories;
+ }
+
+ public static IAdapterFactory getAdapterFactory(String adapterClassname, Map<String, String> configuration,
+ ARecordType itemType) throws Exception {
+ ExternalDataCompatibilityUtils.addCompatabilityParameters(adapterClassname, itemType, configuration);
+ if (!adapterFactories.containsKey(adapterClassname)) {
+ throw new AsterixException("Unknown adapter: " + adapterClassname);
+ }
+ IAdapterFactory adapterFactory = adapterFactories.get(adapterClassname).newInstance();
+ adapterFactory.configure(configuration, itemType);
+ return adapterFactory;
+ }
+
+ public static IIndexingAdapterFactory getAdapterFactory(String adapterClassname, Map<String, String> configuration,
+ ARecordType itemType, List<ExternalFile> snapshot, boolean indexingOp)
+ throws AsterixException, InstantiationException, IllegalAccessException {
+ ExternalDataCompatibilityUtils.addCompatabilityParameters(adapterClassname, itemType, configuration);
+ if (!adapterFactories.containsKey(adapterClassname)) {
+ throw new AsterixException("Unknown adapter");
+ }
+ try {
+ IIndexingAdapterFactory adapterFactory = (IIndexingAdapterFactory) adapterFactories.get(adapterClassname)
+ .newInstance();
+ adapterFactory.setSnapshot(snapshot, indexingOp);
+ adapterFactory.configure(configuration, itemType);
+ return adapterFactory;
+ } catch (Exception e) {
+ throw new AsterixException("Failed to create indexing adapter factory.", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void addNewAdapter(String dataverseName, String adapterClassName, String adapterAlias,
+ String adapterFactoryClassName, String libraryName) throws ClassNotFoundException {
+ ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(dataverseName, libraryName);
+ Class<? extends IAdapterFactory> adapterFactoryClass = (Class<? extends IAdapterFactory>) classLoader
+ .loadClass(adapterFactoryClassName);
+ adapterFactories.put(adapterClassName, adapterFactoryClass);
+ adapterFactories.put(adapterAlias, adapterFactoryClass);
+ }
+
+ public static LookupAdapterFactory<?> getAdapterFactory(Map<String, String> configuration, ARecordType recordType,
+ int[] ridFields, boolean retainInput, boolean retainNull, INullWriterFactory iNullWriterFactory)
+ throws Exception {
+ LookupAdapterFactory<?> adapterFactory = new LookupAdapterFactory<>(recordType, ridFields, retainInput,
+ retainNull, iNullWriterFactory);
+ adapterFactory.configure(configuration);
+ return adapterFactory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
new file mode 100644
index 0000000..68a3942
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.api.IStreamDataParserFactory;
+import org.apache.asterix.external.dataflow.IndexingDataFlowController;
+import org.apache.asterix.external.dataflow.RecordDataFlowController;
+import org.apache.asterix.external.dataflow.StreamDataFlowController;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class DataflowControllerProvider {
+
+ /**
+ * Order of calls:
+ * 1. Constructor()
+ * 2. configure(configuration,ctx)
+ * 3. setTupleForwarder(forwarder)
+ * 4. if record flow controller
+ * |-a. Set record reader
+ * |-b. Set record parser
+ * else
+ * |-a. Set stream parser
+ * 5. start(writer)
+ */
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
+ int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
+ Map<String, String> configuration, boolean indexingOp) throws Exception {
+ switch (dataSourceFactory.getDataSourceType()) {
+ case RECORDS:
+ RecordDataFlowController recordDataFlowController;
+ if (indexingOp) {
+ recordDataFlowController = new IndexingDataFlowController();
+ } else {
+ recordDataFlowController = new RecordDataFlowController();
+ }
+ recordDataFlowController.configure(configuration, ctx);
+ recordDataFlowController.setTupleForwarder(DataflowUtils.getTupleForwarder(configuration));
+ IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory;
+ IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
+ IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
+ IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
+ dataParser.configure(configuration, recordType);
+ recordDataFlowController.setRecordReader(recordReader);
+ recordDataFlowController.setRecordParser(dataParser);
+ return recordDataFlowController;
+ case STREAM:
+ StreamDataFlowController streamDataFlowController = new StreamDataFlowController();
+ streamDataFlowController.configure(configuration, ctx);
+ streamDataFlowController.setTupleForwarder(DataflowUtils.getTupleForwarder(configuration));
+ IInputStreamProviderFactory streamProviderFactory = (IInputStreamProviderFactory) dataSourceFactory;
+ IInputStreamProvider streamProvider = streamProviderFactory.createInputStreamProvider(ctx, partition);
+ IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory;
+ streamParserFactory.configure(configuration);
+ IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
+ streamParser.configure(configuration, recordType);
+ streamParser.setInputStream(streamProvider.getInputStream());
+ streamDataFlowController.setStreamParser(streamParser);
+ return streamDataFlowController;
+ default:
+ throw new AsterixException("Unknown data source type: " + dataSourceFactory.getDataSourceType());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
new file mode 100644
index 0000000..c69e12c
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IInputStreamProviderFactory;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.record.reader.factory.LineRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.factory.SemiStructuredRecordReaderFactory;
+import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
+import org.apache.asterix.external.input.stream.factory.SocketInputStreamProviderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+
+public class DatasourceFactoryProvider {
+
+ public static IExternalDataSourceFactory getExternalDataSourceFactory(Map<String, String> configuration)
+ throws Exception {
+ switch (ExternalDataUtils.getDataSourceType(configuration)) {
+ case RECORDS:
+ return DatasourceFactoryProvider.getRecordReaderFactory(configuration);
+ case STREAM:
+ return DatasourceFactoryProvider
+ .getInputStreamFactory(configuration.get(ExternalDataConstants.KEY_STREAM), configuration);
+ }
+ return null;
+ }
+
+ public static IInputStreamProviderFactory getInputStreamFactory(String stream, Map<String, String> configuration)
+ throws Exception {
+ IInputStreamProviderFactory streamFactory;
+ if (ExternalDataUtils.isExternal(stream)) {
+ String dataverse = ExternalDataUtils.getDataverse(configuration);
+ streamFactory = ExternalDataUtils.createExternalInputStreamFactory(dataverse, stream);
+ } else {
+ switch (stream) {
+ case ExternalDataConstants.STREAM_HDFS:
+ streamFactory = new HDFSDataSourceFactory();
+ break;
+ case ExternalDataConstants.STREAM_LOCAL_FILESYSTEM:
+ streamFactory = new LocalFSInputStreamProviderFactory();
+ break;
+ case ExternalDataConstants.STREAM_SOCKET:
+ streamFactory = new SocketInputStreamProviderFactory();
+ break;
+ default:
+ throw new AsterixException("unknown input stream factory");
+ }
+ }
+ return streamFactory;
+ }
+
+ public static IRecordReaderFactory<?> getRecordReaderFactory(Map<String, String> configuration) throws Exception {
+ String reader = configuration.get(ExternalDataConstants.KEY_READER);
+ IRecordReaderFactory<?> readerFactory;
+ if (ExternalDataUtils.isExternal(reader)) {
+ String dataverse = ExternalDataUtils.getDataverse(configuration);
+ readerFactory = ExternalDataUtils.createExternalRecordReaderFactory(dataverse, reader);
+ } else {
+ switch (reader) {
+ case ExternalDataConstants.READER_HDFS:
+ readerFactory = new HDFSDataSourceFactory();
+ break;
+ case ExternalDataConstants.READER_ADM:
+ case ExternalDataConstants.READER_SEMISTRUCTURED:
+ readerFactory = new SemiStructuredRecordReaderFactory()
+ .setInputStreamFactoryProvider(DatasourceFactoryProvider.getInputStreamFactory(
+ ExternalDataUtils.getRecordReaderStreamName(configuration), configuration));
+ break;
+ case ExternalDataConstants.READER_DELIMITED:
+ readerFactory = new LineRecordReaderFactory()
+ .setInputStreamFactoryProvider(DatasourceFactoryProvider.getInputStreamFactory(
+ ExternalDataUtils.getRecordReaderStreamName(configuration), configuration));;
+ break;
+ default:
+ throw new AsterixException("unknown input stream factory");
+ }
+ }
+ return readerFactory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ExternalIndexerProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ExternalIndexerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ExternalIndexerProvider.java
new file mode 100644
index 0000000..3c090a6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ExternalIndexerProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalIndexer;
+import org.apache.asterix.external.indexing.FileOffsetIndexer;
+import org.apache.asterix.external.indexing.RecordColumnarIndexer;
+import org.apache.asterix.external.util.ExternalDataConstants;
+
+public class ExternalIndexerProvider {
+
+ public static IExternalIndexer getIndexer(Map<String, String> configuration) throws AsterixException {
+ String inputFormatParameter = configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT).trim();
+ if (inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.INPUT_FORMAT_TEXT)
+ || inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.CLASS_NAME_TEXT_INPUT_FORMAT)
+ || inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.INPUT_FORMAT_SEQUENCE)
+ || inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.CLASS_NAME_SEQUENCE_INPUT_FORMAT)) {
+ return new FileOffsetIndexer();
+ } else if (inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.INPUT_FORMAT_RC)
+ || inputFormatParameter.equalsIgnoreCase(ExternalDataConstants.CLASS_NAME_RC_INPUT_FORMAT)) {
+ return new RecordColumnarIndexer();
+ } else {
+ throw new AsterixException("Unable to create indexer for data with format: " + inputFormatParameter);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
new file mode 100644
index 0000000..f5a0512
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
+import org.apache.asterix.external.parser.factory.DelimitedDataParserFactory;
+import org.apache.asterix.external.parser.factory.HiveDataParserFactory;
+import org.apache.asterix.external.parser.factory.RSSParserFactory;
+import org.apache.asterix.external.parser.factory.TweetParserFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+
+public class ParserFactoryProvider {
+ public static IDataParserFactory getDataParserFactory(Map<String, String> configuration)
+ throws InstantiationException, IllegalAccessException, ClassNotFoundException, AsterixException {
+ IDataParserFactory parserFactory = null;
+ String parserFactoryName = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
+ if (parserFactoryName != null && ExternalDataUtils.isExternal(parserFactoryName)) {
+ return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration),
+ parserFactoryName);
+ } else {
+ parserFactory = ParserFactoryProvider.getParserFactory(configuration);
+ }
+ return parserFactory;
+ }
+
+ private static IDataParserFactory getParserFactory(Map<String, String> configuration) throws AsterixException {
+ String recordFormat = ExternalDataUtils.getRecordFormat(configuration);
+ switch (recordFormat) {
+ case ExternalDataConstants.FORMAT_ADM:
+ case ExternalDataConstants.FORMAT_JSON:
+ return new ADMDataParserFactory();
+ case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
+ return new DelimitedDataParserFactory();
+ case ExternalDataConstants.FORMAT_HIVE:
+ return new HiveDataParserFactory();
+ case ExternalDataConstants.FORMAT_TWEET:
+ return new TweetParserFactory();
+ case ExternalDataConstants.FORMAT_RSS:
+ return new RSSParserFactory();
+ default:
+ throw new AsterixException("Unknown data format");
+ }
+ }
+}