You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2015/11/03 01:36:13 UTC
[10/15] incubator-hawq git commit: HAWQ-45. PXF package namespace
refactor
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroResolver.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroResolver.java
new file mode 100644
index 0000000..317040f
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroResolver.java
@@ -0,0 +1,389 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.DataSchemaException;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.RecordkeyAdapter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hawq.pxf.api.io.DataType.*;
+import static org.apache.hawq.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.SCHEMA_NOT_INDICATED;
+import static org.apache.hawq.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.SCHEMA_NOT_ON_CLASSPATH;
+import static org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities.getAvroSchema;
+
+/**
+ * Class AvroResolver handles deserialization of records that were serialized
+ * using the AVRO serialization framework.
+ */
+public class AvroResolver extends Plugin implements ReadResolver {
+ private GenericRecord avroRecord = null;
+ private DatumReader<GenericRecord> reader = null;
+ // member kept to enable reuse, and thus avoid repeated allocation
+ private BinaryDecoder decoder = null;
+ private List<Schema.Field> fields = null;
+ private RecordkeyAdapter recordkeyAdapter = new RecordkeyAdapter();
+ private static final String MAPKEY_DELIM = ":";
+ private static final String RECORDKEY_DELIM = ":";
+ private static final String COLLECTION_DELIM = ",";
+ private String collectionDelim;
+ private String mapkeyDelim;
+ private String recordkeyDelim;
+
+ /**
+ * Constructs an AvroResolver. Initializes Avro data structure: the Avro
+ * record - fields information and the Avro record reader. All Avro data is
+ * build from the Avro schema, which is based on the *.avsc file that was
+ * passed by the user
+ *
+ * @param input all input parameters coming from the client
+ * @throws IOException if Avro schema could not be retrieved or parsed
+ */
+ public AvroResolver(InputData input) throws IOException {
+ super(input);
+
+ Schema schema = isAvroFile() ? getAvroSchema(new Configuration(),
+ input.getDataSource())
+ : (new Schema.Parser()).parse(openExternalSchema());
+
+ reader = new GenericDatumReader<>(schema);
+ fields = schema.getFields();
+
+ collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
+ : input.getUserProperty("COLLECTION_DELIM");
+ mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
+ : input.getUserProperty("MAPKEY_DELIM");
+ recordkeyDelim = input.getUserProperty("RECORDKEY_DELIM") == null ? RECORDKEY_DELIM
+ : input.getUserProperty("RECORDKEY_DELIM");
+ }
+
+ /**
+ * Returns a list of the fields of one record. Each record field is
+ * represented by a OneField item. OneField item contains two fields: an
+ * integer representing the field type and a Java Object representing the
+ * field value.
+ */
+ @Override
+ public List<OneField> getFields(OneRow row) throws Exception {
+ avroRecord = makeAvroRecord(row.getData(), avroRecord);
+ List<OneField> record = new LinkedList<OneField>();
+
+ int recordkeyIndex = (inputData.getRecordkeyColumn() == null) ? -1
+ : inputData.getRecordkeyColumn().columnIndex();
+ int currentIndex = 0;
+
+ for (Schema.Field field : fields) {
+ /*
+ * Add the record key if exists
+ */
+ if (currentIndex == recordkeyIndex) {
+ currentIndex += recordkeyAdapter.appendRecordkeyField(record,
+ inputData, row);
+ }
+
+ currentIndex += populateRecord(record,
+ avroRecord.get(field.name()), field.schema());
+ }
+
+ return record;
+ }
+
+ /**
+ * Tests if the Avro records are residing inside an AVRO file. If the Avro
+ * records are not residing inside an AVRO file, then they may reside inside
+ * a sequence file, regular file, ...
+ *
+ * @return whether the resource is an Avro file
+ */
+ boolean isAvroFile() {
+ return inputData.getAccessor().toLowerCase().contains("avro");
+ }
+
+ /**
+ * The record can arrive from one out of two different sources: a sequence
+ * file or an AVRO file. If it comes from an AVRO file, then it was already
+ * obtained as a {@link GenericRecord} when when it was fetched from the
+ * file with the {@link AvroRecorReader} so in this case a cast is enough.
+ * On the other hand, if the source is a sequence file, then the input
+ * parameter obj hides a bytes [] buffer which is in fact one Avro record
+ * serialized. Here, we build the Avro record from the flat buffer, using
+ * the AVRO API. Then (for both cases) in the remaining functions we build a
+ * {@code List<OneField>} record from the Avro record.
+ *
+ * @param obj object holding an Avro record
+ * @param reuseRecord Avro record to be reused to create new record from obj
+ * @return Avro record
+ * @throws IOException if creating the Avro record from byte array failed
+ */
+ GenericRecord makeAvroRecord(Object obj, GenericRecord reuseRecord)
+ throws IOException {
+ if (isAvroFile()) {
+ return (GenericRecord) obj;
+ } else {
+ byte[] bytes = ((BytesWritable) obj).getBytes();
+ decoder = DecoderFactory.get().binaryDecoder(bytes, decoder);
+ return reader.read(reuseRecord, decoder);
+ }
+ }
+
+ /**
+ * For a given field in the Avro record we extract its value and insert it
+ * into the output {@code List<OneField>} record. An Avro field can be a
+ * primitive type or an array type.
+ *
+ * @param record list of fields to be populated
+ * @param fieldValue field value
+ * @param fieldSchema field schema
+ * @return the number of populated fields
+ */
+ int populateRecord(List<OneField> record, Object fieldValue,
+ Schema fieldSchema) {
+
+ Schema.Type fieldType = fieldSchema.getType();
+ int ret = 0;
+ Object value = fieldValue;
+
+ switch (fieldType) {
+ case ARRAY:
+ if(fieldValue == null) {
+ return addOneFieldToRecord(record, TEXT, fieldValue);
+ }
+ List<OneField> listRecord = new LinkedList<>();
+ ret = setArrayField(listRecord, fieldValue, fieldSchema);
+ addOneFieldToRecord(record, TEXT, String.format("[%s]",
+ HdfsUtilities.toString(listRecord, collectionDelim)));
+ break;
+ case MAP:
+ if(fieldValue == null) {
+ return addOneFieldToRecord(record, TEXT, fieldValue);
+ }
+ List<OneField> mapRecord = new LinkedList<>();
+ ret = setMapField(mapRecord, fieldValue, fieldSchema);
+ addOneFieldToRecord(record, TEXT, String.format("{%s}",
+ HdfsUtilities.toString(mapRecord, collectionDelim)));
+ break;
+ case RECORD:
+ if(fieldValue == null) {
+ return addOneFieldToRecord(record, TEXT, fieldValue);
+ }
+ List<OneField> recRecord = new LinkedList<>();
+ ret = setRecordField(recRecord, fieldValue, fieldSchema);
+ addOneFieldToRecord(record, TEXT, String.format("{%s}",
+ HdfsUtilities.toString(recRecord, collectionDelim)));
+ break;
+ case UNION:
+ /*
+ * When an Avro field is actually a union, we resolve the type
+ * of the union element, and delegate the record update via
+ * recursion
+ */
+ int unionIndex = GenericData.get().resolveUnion(fieldSchema,
+ fieldValue);
+ /**
+ * Retrieve index of the non null data type from the type array
+ * if value is null
+ */
+ if (fieldValue == null) {
+ unionIndex ^= 1;
+ }
+ ret = populateRecord(record, fieldValue,
+ fieldSchema.getTypes().get(unionIndex));
+ break;
+ case ENUM:
+ ret = addOneFieldToRecord(record, TEXT, value);
+ break;
+ case INT:
+ ret = addOneFieldToRecord(record, INTEGER, value);
+ break;
+ case DOUBLE:
+ ret = addOneFieldToRecord(record, FLOAT8, value);
+ break;
+ case STRING:
+ value = (fieldValue != null) ? String.format("%s", fieldValue)
+ : null;
+ ret = addOneFieldToRecord(record, TEXT, value);
+ break;
+ case FLOAT:
+ ret = addOneFieldToRecord(record, REAL, value);
+ break;
+ case LONG:
+ ret = addOneFieldToRecord(record, BIGINT, value);
+ break;
+ case BYTES:
+ ret = addOneFieldToRecord(record, BYTEA, value);
+ break;
+ case BOOLEAN:
+ ret = addOneFieldToRecord(record, BOOLEAN, value);
+ break;
+ case FIXED:
+ ret = addOneFieldToRecord(record, BYTEA, value);
+ break;
+ default:
+ break;
+ }
+ return ret;
+ }
+
+ /**
+ * When an Avro field is actually a record, we iterate through each field
+ * for each entry, the field name and value are added to a local record
+ * {@code List<OneField>} complexRecord with the necessary delimiter we
+ * create an object of type OneField and insert it into the output
+ * {@code List<OneField>} record.
+ *
+ * @param record list of fields to be populated
+ * @param value field value
+ * @param recSchema record schema
+ * @return number of populated fields
+ */
+ int setRecordField(List<OneField> record, Object value, Schema recSchema) {
+
+ GenericRecord rec = ((GenericData.Record) value);
+ Schema fieldKeySchema = Schema.create(Schema.Type.STRING);
+ int currentIndex = 0;
+ for (Schema.Field field : recSchema.getFields()) {
+ Schema fieldSchema = field.schema();
+ Object fieldValue = rec.get(field.name());
+ List<OneField> complexRecord = new LinkedList<>();
+ populateRecord(complexRecord, field.name(), fieldKeySchema);
+ populateRecord(complexRecord, fieldValue, fieldSchema);
+ addOneFieldToRecord(record, TEXT,
+ HdfsUtilities.toString(complexRecord, recordkeyDelim));
+ currentIndex++;
+ }
+ return currentIndex;
+ }
+
+ /**
+ * When an Avro field is actually a map, we resolve the type of the map
+ * value For each entry, the field name and value are added to a local
+ * record we create an object of type OneField and insert it into the output
+ * {@code List<OneField>} record.
+ *
+ * Unchecked warning is suppressed to enable us to cast fieldValue to a Map.
+ * (since the value schema has been identified to me of type map)
+ *
+ * @param record list of fields to be populated
+ * @param fieldValue field value
+ * @param mapSchema map schema
+ * @return number of populated fields
+ */
+ @SuppressWarnings("unchecked")
+ int setMapField(List<OneField> record, Object fieldValue, Schema mapSchema) {
+ Schema keySchema = Schema.create(Schema.Type.STRING);
+ Schema valueSchema = mapSchema.getValueType();
+ Map<String, ?> avroMap = ((Map<String, ?>) fieldValue);
+ for (Map.Entry<String, ?> entry : avroMap.entrySet()) {
+ List<OneField> complexRecord = new LinkedList<>();
+ populateRecord(complexRecord, entry.getKey(), keySchema);
+ populateRecord(complexRecord, entry.getValue(), valueSchema);
+ addOneFieldToRecord(record, TEXT,
+ HdfsUtilities.toString(complexRecord, mapkeyDelim));
+ }
+ return avroMap.size();
+ }
+
+ /**
+ * When an Avro field is actually an array, we resolve the type of the array
+ * element, and for each element in the Avro array, we recursively invoke
+ * the population of {@code List<OneField>} record.
+ *
+ * @param record list of fields to be populated
+ * @param fieldValue field value
+ * @param arraySchema array schema
+ * @return number of populated fields
+ */
+ int setArrayField(List<OneField> record, Object fieldValue,
+ Schema arraySchema) {
+ Schema typeSchema = arraySchema.getElementType();
+ GenericData.Array<?> array = (GenericData.Array<?>) fieldValue;
+ int length = array.size();
+ for (int i = 0; i < length; i++) {
+ populateRecord(record, array.get(i), typeSchema);
+ }
+ return length;
+ }
+
+ /**
+ * Creates the {@link OneField} object and adds it to the output {@code List<OneField>}
+ * record. Strings and byte arrays are held inside special types in the Avro
+ * record so we transfer them to standard types in order to enable their
+ * insertion in the GPDBWritable instance.
+ *
+ * @param record list of fields to be populated
+ * @param gpdbWritableType field type
+ * @param val field value
+ * @return 1 (number of populated fields)
+ */
+ int addOneFieldToRecord(List<OneField> record, DataType gpdbWritableType,
+ Object val) {
+ OneField oneField = new OneField();
+ oneField.type = gpdbWritableType.getOID();
+ switch (gpdbWritableType) {
+ case BYTEA:
+ if (val instanceof ByteBuffer) {
+ oneField.val = ((ByteBuffer) val).array();
+ } else {
+ /**
+ * Entry point when the underlying bytearray is from a Fixed
+ * data
+ */
+ oneField.val = ((GenericData.Fixed) val).bytes();
+ }
+ break;
+ default:
+ oneField.val = val;
+ break;
+ }
+
+ record.add(oneField);
+ return 1;
+ }
+
+ /**
+ * Opens Avro schema based on DATA-SCHEMA parameter.
+ *
+ * @return InputStream of schema file
+ * @throws DataSchemaException if schema file could not be opened
+ */
+ InputStream openExternalSchema() {
+
+ String schemaName = inputData.getUserProperty("DATA-SCHEMA");
+
+ /**
+ * Testing that the schema name was supplied by the user - schema is an
+ * optional properly.
+ */
+ if (schemaName == null) {
+ throw new DataSchemaException(SCHEMA_NOT_INDICATED,
+ this.getClass().getName());
+ }
+
+ /** Testing that the schema resource exists. */
+ if (this.getClass().getClassLoader().getResource(schemaName) == null) {
+ throw new DataSchemaException(SCHEMA_NOT_ON_CLASSPATH, schemaName);
+ }
+ ClassLoader loader = this.getClass().getClassLoader();
+ return loader.getResourceAsStream(schemaName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReader.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReader.java
new file mode 100644
index 0000000..6c8d54c
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReader.java
@@ -0,0 +1,175 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A class that provides a line reader from an input stream. Lines are
+ * terminated by '\n' (LF) EOF also terminates an otherwise unterminated line.
+ */
+public class ChunkReader implements Closeable {
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
+ private InputStream in;
+ private byte[] buffer;
+ // the number of bytes of real data in the buffer
+ private int bufferLength = 0;
+ // the current position in the buffer
+ private int bufferPosn = 0;
+ private static final byte LF = '\n';
+
+ /**
+ * Constructs a ChunkReader instance
+ *
+ * @param in input stream
+ */
+ public ChunkReader(InputStream in) {
+ this.in = in;
+ this.buffer = new byte[this.bufferSize];
+ }
+
+ /**
+ * Closes the underlying stream.
+ */
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ /*
+ * Internal class used for holding part of a chunk brought by one read()
+ * operation on the input stream. We collect several such nodes in a list by
+ * doing several read operation until we reach the chunk size -
+ * maxBytesToConsume
+ */
+ private class Node {
+ /* part of a chunk brought in a single inputstream.read() operation */
+ public byte[] slice;
+ /* the size of the slice */
+ public int len;
+ }
+
+ /**
+ * Reads data in chunks of DEFAULT_CHUNK_SIZE, until we reach
+ * maxBytesToConsume.
+ *
+ * @param str - output parameter, will contain the read chunk byte array
+ * @param maxBytesToConsume - requested chunk size
+ * @return actual chunk size
+ * @throws IOException if the first byte cannot be read for any reason
+ * other than the end of the file, if the input stream has been closed,
+ * or if some other I/O error occurs.
+ */
+ public int readChunk(Writable str, int maxBytesToConsume) throws IOException
+ {
+ ChunkWritable cw = (ChunkWritable) str;
+ List<Node> list = new LinkedList<Node>();
+
+ long bytesConsumed = 0;
+
+ do {
+ if (bufferLength > 0) {
+ int remaining = bufferLength - bufferPosn;
+ Node nd = new Node();
+ nd.slice = new byte[remaining];
+ nd.len = remaining;
+ System.arraycopy(buffer, bufferPosn, nd.slice, 0, nd.len);
+ list.add(nd);
+ bytesConsumed += nd.len;
+ } else {
+ Node nd = new Node();
+ nd.slice = new byte[buffer.length];
+ nd.len = in.read(nd.slice);
+ if (nd.len <= 0) {
+ break; // EOF
+ }
+ bytesConsumed += nd.len;
+ list.add(nd);
+ }
+
+ bufferLength = bufferPosn = 0;
+
+ } while (bytesConsumed < maxBytesToConsume);
+
+ if (list.size() > 0) {
+ cw.box = new byte[(int) bytesConsumed];
+ int pos = 0;
+ for (int i = 0; i < list.size(); i++) {
+ Node n = list.get(i);
+ System.arraycopy(n.slice, 0, cw.box, pos, n.len);
+ pos += n.len;
+ }
+ }
+
+ return (int) bytesConsumed;
+ }
+
+ /**
+ * Reads a line terminated by LF.
+ *
+ * @param str - output parameter, will contain the read record
+ * @param maxBytesToConsume - the line mustn't exceed this value
+ * @return length of the line read
+ * @throws IOException if the first byte cannot be read for any reason
+ * other than the end of the file, if the input stream has been closed,
+ * or if some other I/O error occurs.
+ */
+ public int readLine(Writable str, int maxBytesToConsume) throws IOException {
+ ChunkWritable cw = (ChunkWritable) str;
+ List<Node> list = new LinkedList<Node>();
+
+ boolean newLine = false; // length of terminating newline
+ long bytesConsumed = 0;
+
+ do {
+ int startPosn = bufferPosn; // starting from where we left off the
+ // last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+
+ bufferLength = in.read(buffer);
+ if (bufferLength <= 0) {
+ break; // EOF
+ }
+ }
+
+ for (; bufferPosn < bufferLength; ++bufferPosn) { // search for
+ // newline
+ if (buffer[bufferPosn] == LF) {
+ newLine = true;
+ ++bufferPosn; // at next invocation proceed from following
+ // byte
+ break;
+ }
+ }
+
+ int readLength = bufferPosn - startPosn;
+ bytesConsumed += readLength;
+
+ if (readLength > 0) {
+ Node nd = new Node();
+ nd.slice = new byte[readLength];
+ nd.len = readLength;
+ System.arraycopy(buffer, startPosn, nd.slice, 0, nd.len);
+ list.add(nd);
+ }
+ } while (!newLine && bytesConsumed < maxBytesToConsume);
+
+ if (list.size() > 0) {
+ cw.box = new byte[(int) bytesConsumed];
+ int pos = 0;
+ for (int i = 0; i < list.size(); i++) {
+ Node n = list.get(i);
+ System.arraycopy(n.slice, 0, cw.box, pos, n.len);
+ pos += n.len;
+ }
+ }
+
+ return (int) bytesConsumed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkRecordReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkRecordReader.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkRecordReader.java
new file mode 100644
index 0000000..590b89c
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkRecordReader.java
@@ -0,0 +1,281 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
+import static org.apache.hadoop.mapreduce.lib.input.LineRecordReader.MAX_LINE_LENGTH;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * ChunkRecordReader is designed for fast reading of a file split. The idea is
+ * to bring chunks of data instead of single records. The chunks contain many
+ * records and the chunk end is not aligned on a record boundary. The size of
+ * the chunk is a class hardcoded parameter - CHUNK_SIZE. This behaviour sets
+ * this reader apart from the other readers which will fetch one record and stop
+ * when reaching a record delimiter.
+ */
+public class ChunkRecordReader implements
+ RecordReader<LongWritable, ChunkWritable> {
+ private static final Log LOG = LogFactory.getLog(ChunkRecordReader.class.getName());
+
+ private CompressionCodecFactory compressionCodecs = null;
+ private long start;
+ private long pos;
+ private long end;
+ private long fileLength;
+ private ChunkReader in;
+ private FSDataInputStream fileIn;
+ private final Seekable filePosition;
+ private int maxLineLength;
+ private CompressionCodec codec;
+ private Decompressor decompressor;
+ private static final int CHUNK_SIZE = 1024 * 1024;
+
+ /**
+ * Translates the FSDataInputStream into a DFSInputStream.
+ */
+ private DFSInputStream getInputStream() {
+ return (DFSInputStream) (fileIn.getWrappedStream());
+ }
+
+ /**
+ * Returns statistics of the input stream's read operation: total bytes
+ * read, bytes read locally, bytes read in short-circuit (directly from file
+ * descriptor).
+ *
+ * @return an instance of ReadStatistics class
+ */
+ public ReadStatistics getReadStatistics() {
+ return getInputStream().getReadStatistics();
+ }
+
+ /**
+ * Constructs a ChunkRecordReader instance.
+ *
+ * @param job the job configuration
+ * @param split contains the file name, begin byte of the split and the
+ * bytes length
+ * @throws IOException if an I/O error occurs when accessing the file or
+ * creating input stream to read from it
+ */
+ public ChunkRecordReader(Configuration job, FileSplit split)
+ throws IOException {
+ maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
+ validateLength(maxLineLength);
+ start = split.getStart();
+ end = start + split.getLength();
+ final Path file = split.getPath();
+ compressionCodecs = new CompressionCodecFactory(job);
+ codec = compressionCodecs.getCodec(file);
+
+ // open the file and seek to the start of the split
+ job.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+ final FileSystem fs = file.getFileSystem(job);
+ fs.setVerifyChecksum(false);
+ fileIn = fs.open(file, ChunkReader.DEFAULT_BUFFER_SIZE);
+ fileLength = getInputStream().getFileLength();
+ if (isCompressedInput()) {
+ decompressor = CodecPool.getDecompressor(codec);
+ if (codec instanceof SplittableCompressionCodec) {
+ final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
+ fileIn, decompressor, start, end,
+ SplittableCompressionCodec.READ_MODE.BYBLOCK);
+ in = new ChunkReader(cIn);
+ start = cIn.getAdjustedStart();
+ end = cIn.getAdjustedEnd();
+ filePosition = cIn; // take pos from compressed stream
+ } else {
+ in = new ChunkReader(codec.createInputStream(fileIn,
+ decompressor));
+ filePosition = fileIn;
+ }
+ } else {
+ fileIn.seek(start);
+ in = new ChunkReader(fileIn);
+ filePosition = fileIn;
+ }
+ /*
+ * If this is not the first split, we always throw away first record
+ * because we always (except the last split) read one extra line in
+ * next() method.
+ */
+ if (start != 0) {
+ start += in.readLine(new ChunkWritable(), maxBytesToConsume(start));
+ }
+ this.pos = start;
+ }
+
+ /**
+ * Used by the client of this class to create the 'key' output parameter for
+ * next() method.
+ *
+ * @return an instance of LongWritable
+ */
+ @Override
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+
+ /**
+ * Used by the client of this class to create the 'value' output parameter
+ * for next() method.
+ *
+ * @return an instance of ChunkWritable
+ */
+ @Override
+ public ChunkWritable createValue() {
+ return new ChunkWritable();
+ }
+
+ /**
+ * Fetches the next data chunk from the file split. The size of the chunk is
+ * a class hardcoded parameter - CHUNK_SIZE. This behaviour sets this reader
+ * apart from the other readers which will fetch one record and stop when
+ * reaching a record delimiter.
+ *
+ * @param key - output parameter. When method returns will contain the key -
+ * the number of the start byte of the chunk
+ * @param value - output parameter. When method returns will contain the
+ * value - the chunk, a byte array inside the ChunkWritable
+ * instance
+ * @return false - when end of split was reached
+ * @throws IOException if an I/O error occurred while reading the next chunk
+ * or line
+ */
+ @Override
+ public synchronized boolean next(LongWritable key, ChunkWritable value)
+ throws IOException {
+ /*
+ * Usually a record is spread between the end of current split and the
+ * beginning of next split. So when reading the last record in the split
+ * we usually need to cross over to the next split. This tricky logic is
+ * implemented in ChunkReader.readLine(). In order not to rewrite this
+ * logic we will read the lust chunk in the split with readLine(). For a
+ * split of 120M, reading the last 1M line by line doesn't have a huge
+ * impact. Applying a factor to the last chunk to make sure we start
+ * before the last record.
+ */
+ float factor = 1.5f;
+ int limit = (int) (factor * CHUNK_SIZE);
+ long curPos = getFilePosition();
+ int newSize = 0;
+
+ while (curPos <= end) {
+ key.set(pos);
+
+ if ((end - curPos) > limit) {
+ newSize = in.readChunk(value, CHUNK_SIZE);
+ } else {
+ newSize = in.readLine(value,
+ Math.max(maxBytesToConsume(pos), maxLineLength));
+ }
+ if (newSize == 0) {
+ break;
+ }
+
+ pos += newSize;
+
+ if (pos == fileLength) { /*
+ * in case text file last character is not
+ * a linefeed
+ */
+ if (value.box[value.box.length - 1] != '\n') {
+ int newLen = value.box.length + 1;
+ byte[] tmp = new byte[newLen];
+ System.arraycopy(value.box, 0, tmp, 0, newLen - 1);
+ tmp[newLen - 1] = '\n';
+ value.box = tmp;
+ }
+ }
+
+ return true;
+ }
+ /*
+ * if we got here, either newSize was 0 or curPos is bigger than end
+ */
+
+ return false;
+ }
+
+ /**
+ * Gets the progress within the split.
+ */
+ @Override
+ public synchronized float getProgress() throws IOException {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (getFilePosition() - start)
+ / (float) (end - start));
+ }
+ }
+
+ /**
+ * Returns the position of the unread tail of the file
+ *
+ * @return pos - start byte of the unread tail of the file
+ */
+ @Override
+ public synchronized long getPos() throws IOException {
+ return pos;
+ }
+
+ /**
+ * Closes the input stream.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ }
+ }
+ }
+
+ private void validateLength(int maxLineLength) {
+ if (maxLineLength <= 0)
+ throw new IllegalArgumentException(
+ "maxLineLength must be a positive value");
+ }
+
+ private boolean isCompressedInput() {
+ return (codec != null);
+ }
+
+ private int maxBytesToConsume(long pos) {
+ return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(
+ Integer.MAX_VALUE, end - pos);
+ }
+
+ private long getFilePosition() throws IOException {
+ long retVal;
+ if (isCompressedInput() && null != filePosition) {
+ retVal = filePosition.getPos();
+ } else {
+ retVal = pos;
+ }
+ return retVal;
+ }
+} // class ChunkRecordReader
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkWritable.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkWritable.java
new file mode 100644
index 0000000..936c3ef
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkWritable.java
@@ -0,0 +1,39 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import java.io.DataOutput;
+import java.io.DataInput;
+import java.lang.UnsupportedOperationException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Just an output buffer for the ChunkRecordReader. It must extend Writable
+ * otherwise it will not fit into the next() interface method
+ */
+public class ChunkWritable implements Writable {
+ public byte [] box;
+
+ /**
+ * Serializes the fields of this object to <code>out</code>.
+ *
+ * @param out <code>DataOutput</code> to serialize this object into.
+ * @throws UnsupportedOperationException this function is not supported
+ */
+ @Override
+ public void write(DataOutput out) {
+ throw new UnsupportedOperationException("ChunkWritable.write() is not implemented");
+ }
+
+ /**
+ * Deserializes the fields of this object from <code>in</code>.
+ * <p>For efficiency, implementations should attempt to re-use storage in the
+ * existing object where possible.</p>
+ *
+ * @param in <code>DataInput</code> to deserialize this object from.
+ * @throws UnsupportedOperationException this function is not supported
+ */
+ @Override
+ public void readFields(DataInput in) {
+ throw new UnsupportedOperationException("ChunkWritable.readFields() is not implemented");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
new file mode 100644
index 0000000..c80244a
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
@@ -0,0 +1,145 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.Analyzer;
+import org.apache.hawq.pxf.api.AnalyzerStats;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.service.ReadBridge;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.PxfInputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * Analyzer class for HDFS data resources
+ *
+ * Given an HDFS data source (a file, directory, or wild card pattern)
+ * return statistics about it (number of blocks, number of tuples, etc.)
+ */
+public class HdfsAnalyzer extends Analyzer {
+ private JobConf jobConf;
+ private FileSystem fs;
+ private Log Log;
+
+ /**
+ * Constructs an HdfsAnalyzer object.
+ *
+ * @param inputData all input parameters coming from the client
+ * @throws IOException if HDFS file system cannot be retrieved
+ */
+ public HdfsAnalyzer(InputData inputData) throws IOException {
+ super(inputData);
+ Log = LogFactory.getLog(HdfsAnalyzer.class);
+
+ jobConf = new JobConf(new Configuration(), HdfsAnalyzer.class);
+ fs = FileSystem.get(jobConf);
+ }
+
+ /**
+ * Collects a number of basic statistics based on an estimate. Statistics
+ * are: number of records, number of hdfs blocks and hdfs block size.
+ *
+ * @param datapath path is a data source URI that can appear as a file
+ * name, a directory name or a wildcard pattern
+ * @return statistics in JSON format
+ * @throws Exception if path is wrong, its metadata cannot be retrieved
+ * from file system, or if scanning the first block
+ * using the accessor failed
+ */
+ @Override
+ public AnalyzerStats getEstimatedStats(String datapath) throws Exception {
+ long blockSize = 0;
+ long numberOfBlocks;
+ Path path = new Path(HdfsUtilities.absoluteDataPath(datapath));
+
+ ArrayList<InputSplit> splits = getSplits(path);
+
+ for (InputSplit split : splits) {
+ FileSplit fsp = (FileSplit) split;
+ Path filePath = fsp.getPath();
+ FileStatus fileStatus = fs.getFileStatus(filePath);
+ if (fileStatus.isFile()) {
+ blockSize = fileStatus.getBlockSize();
+ break;
+ }
+ }
+
+ // if no file is in path (only dirs), get default block size
+ if (blockSize == 0) {
+ blockSize = fs.getDefaultBlockSize(path);
+ }
+ numberOfBlocks = splits.size();
+
+
+ long numberOfTuplesInBlock = getNumberOfTuplesInBlock(splits);
+ AnalyzerStats stats = new AnalyzerStats(blockSize, numberOfBlocks, numberOfTuplesInBlock * numberOfBlocks);
+
+ //print files size to log when in debug level
+ Log.debug(AnalyzerStats.dataToString(stats, path.toString()));
+
+ return stats;
+ }
+
+ /**
+ * Calculates the number of tuples in a split (block).
+ * Reads one block from HDFS. Exception during reading will
+ * filter upwards and handled in AnalyzerResource
+ */
+ private long getNumberOfTuplesInBlock(ArrayList<InputSplit> splits) throws Exception {
+ long tuples = -1; /* default - if we are not able to read data */
+ ReadAccessor accessor;
+
+ if (splits.isEmpty()) {
+ return 0;
+ }
+
+ /*
+ * metadata information includes: file split's
+ * start, length and hosts (locations).
+ */
+ FileSplit firstSplit = (FileSplit) splits.get(0);
+ byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(firstSplit);
+ inputData.setFragmentMetadata(fragmentMetadata);
+ inputData.setDataSource(firstSplit.getPath().toUri().getPath());
+ accessor = ReadBridge.getFileAccessor(inputData);
+
+ if (accessor.openForRead()) {
+ tuples = 0;
+ while (accessor.readNextObject() != null) {
+ tuples++;
+ }
+
+ accessor.closeForRead();
+ }
+
+ return tuples;
+ }
+
+ private ArrayList<InputSplit> getSplits(Path path) throws IOException {
+ PxfInputFormat fformat = new PxfInputFormat();
+ PxfInputFormat.setInputPaths(jobConf, path);
+ InputSplit[] splits = fformat.getSplits(jobConf, 1);
+ ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+
+ // remove empty splits
+ if (splits != null) {
+ for (InputSplit split : splits) {
+ if (split.getLength() > 0) {
+ result.add(split);
+ }
+ }
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
new file mode 100644
index 0000000..fb728fa
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
@@ -0,0 +1,112 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+/**
+ * Base class for enforcing the complete access of a file in one accessor.
+ * Since we are not accessing the file using the splittable API, but instead are
+ * using the "simple" stream API, it means that we cannot fetch different parts
+ * (splits) of the file in different segments. Instead each file access brings
+ * the complete file. And, if several segments would access the same file, then
+ * each one will return the whole file and we will observe in the query result,
+ * each record appearing number_of_segments times. To avoid this we will only
+ * have one segment (segment 0) working for this case - enforced with
+ * isWorkingSegment() method. Naturally this is the less recommended working
+ * mode since we are not making use of segment parallelism. HDFS accessors for
+ * a specific file type should inherit from this class only if the file they are
+ * reading does not support splitting: a protocol-buffer file, regular file, ...
+ */
+public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAccessor {
+ private Configuration conf = null;
+ protected InputStream inp = null;
+ private FileSplit fileSplit = null;
+
+ /**
+ * Constructs a HdfsAtomicDataAccessor object.
+ *
+ * @param input all input parameters coming from the client
+ */
+ public HdfsAtomicDataAccessor(InputData input) {
+ // 0. Hold the configuration data
+ super(input);
+
+ // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files
+ conf = new Configuration();
+
+ fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+ }
+
+ /**
+ * Opens the file using the non-splittable API for HADOOP HDFS file access
+ * This means that instead of using a FileInputFormat for access, we use a
+ * Java stream.
+ *
+ * @return true for successful file open, false otherwise
+ */
+ @Override
+ public boolean openForRead() throws Exception {
+ if (!isWorkingSegment()) {
+ return false;
+ }
+
+ // input data stream
+ FileSystem fs = FileSystem.get(URI.create(inputData.getDataSource()), conf); // FileSystem.get actually returns an FSDataInputStream
+ inp = fs.open(new Path(inputData.getDataSource()));
+
+ return (inp != null);
+ }
+
+ /**
+ * Fetches one record from the file.
+ *
+ * @return a {@link OneRow} record as a Java object. Returns null if none.
+ */
+ @Override
+ public OneRow readNextObject() throws IOException {
+ if (!isWorkingSegment()) {
+ return null;
+ }
+
+ return new OneRow(null, new Object());
+ }
+
+ /**
+ * Closes the access stream when finished reading the file
+ */
+ @Override
+ public void closeForRead() throws Exception {
+ if (!isWorkingSegment()) {
+ return;
+ }
+
+ if (inp != null) {
+ inp.close();
+ }
+ }
+
+ /*
+ * Making sure that only the segment that got assigned the first data
+ * fragment will read the (whole) file.
+ */
+ private boolean isWorkingSegment() {
+ return (fileSplit.getStart() == 0);
+ }
+
+ @Override
+ public boolean isThreadSafe() {
+ return HdfsUtilities.isThreadSafe(inputData.getDataSource(),
+ inputData.getUserProperty("COMPRESSION_CODEC"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
new file mode 100644
index 0000000..5c81ef8
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
@@ -0,0 +1,79 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.PxfInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Fragmenter class for HDFS data resources.
+ *
+ * Given an HDFS data source (a file, directory, or wild card pattern) divide
+ * the data into fragments and return a list of them along with a list of
+ * host:port locations for each.
+ */
+public class HdfsDataFragmenter extends Fragmenter {
+ private JobConf jobConf;
+
+ /**
+ * Constructs an HdfsDataFragmenter object.
+ *
+ * @param md all input parameters coming from the client
+ */
+ public HdfsDataFragmenter(InputData md) {
+ super(md);
+
+ jobConf = new JobConf(new Configuration(), HdfsDataFragmenter.class);
+ }
+
+ /**
+ * Gets the fragments for a data source URI that can appear as a file name,
+ * a directory name or a wildcard. Returns the data fragments in JSON
+ * format.
+ */
+ @Override
+ public List<Fragment> getFragments() throws Exception {
+ String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+ InputSplit[] splits = getSplits(new Path(absoluteDataPath));
+
+ for (InputSplit split : splits != null ? splits : new InputSplit[] {}) {
+ FileSplit fsp = (FileSplit) split;
+
+ /*
+ * HD-2547: If the file is empty, an empty split is returned: no
+ * locations and no length.
+ */
+ if (fsp.getLength() <= 0) {
+ continue;
+ }
+
+ String filepath = fsp.getPath().toUri().getPath();
+ String[] hosts = fsp.getLocations();
+
+ /*
+ * metadata information includes: file split's start, length and
+ * hosts (locations).
+ */
+ byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(fsp);
+ Fragment fragment = new Fragment(filepath, hosts, fragmentMetadata);
+ fragments.add(fragment);
+ }
+
+ return fragments;
+ }
+
+ private InputSplit[] getSplits(Path path) throws IOException {
+ PxfInputFormat format = new PxfInputFormat();
+ PxfInputFormat.setInputPaths(jobConf, path);
+ return format.getSplits(jobConf, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
new file mode 100644
index 0000000..c1f2442
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
@@ -0,0 +1,146 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.ListIterator;
+
+/**
+ * Accessor for accessing a splittable HDFS data sources. HDFS will divide the
+ * file into splits based on an internal decision (by default, the block size is
+ * also the split size).
+ *
+ * Accessors that require such base functionality should extend this class.
+ */
+public abstract class HdfsSplittableDataAccessor extends Plugin implements
+ ReadAccessor {
+ protected Configuration conf = null;
+ protected RecordReader<Object, Object> reader = null;
+ protected InputFormat<?, ?> inputFormat = null;
+ protected ListIterator<InputSplit> iter = null;
+ protected JobConf jobConf = null;
+ protected Object key, data;
+
+ /**
+ * Constructs an HdfsSplittableDataAccessor
+ *
+ * @param input all input parameters coming from the client request
+ * @param inFormat the HDFS {@link InputFormat} the caller wants to use
+ */
+ public HdfsSplittableDataAccessor(InputData input,
+ InputFormat<?, ?> inFormat) {
+ super(input);
+ inputFormat = inFormat;
+
+ // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files
+ conf = new Configuration();
+
+ // 2. variable required for the splits iteration logic
+ jobConf = new JobConf(conf, HdfsSplittableDataAccessor.class);
+ }
+
+ /**
+ * Fetches the requested fragment (file split) for the current client
+ * request, and sets a record reader for the job.
+ *
+ * @return true if succeeded, false if no more splits to be read
+ */
+ @Override
+ public boolean openForRead() throws Exception {
+ LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>();
+ FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+ requestSplits.add(fileSplit);
+
+ // Initialize record reader based on current split
+ iter = requestSplits.listIterator(0);
+ return getNextSplit();
+ }
+
+ /**
+ * Specialized accessors will override this method and implement their own
+ * recordReader. For example, a plain delimited text accessor may want to
+ * return a LineRecordReader.
+ *
+ * @param jobConf the hadoop jobconf to use for the selected InputFormat
+ * @param split the input split to be read by the accessor
+ * @return a recordreader to be used for reading the data records of the
+ * split
+ * @throws IOException if recordreader could not be created
+ */
+ abstract protected Object getReader(JobConf jobConf, InputSplit split)
+ throws IOException;
+
+ /**
+ * Sets the current split and initializes a RecordReader who feeds from the
+ * split
+ *
+ * @return true if there is a split to read
+ * @throws IOException if record reader could not be created
+ */
+ @SuppressWarnings(value = "unchecked")
+ protected boolean getNextSplit() throws IOException {
+ if (!iter.hasNext()) {
+ return false;
+ }
+
+ InputSplit currSplit = iter.next();
+ reader = (RecordReader<Object, Object>) getReader(jobConf, currSplit);
+ key = reader.createKey();
+ data = reader.createValue();
+ return true;
+ }
+
+ /**
+ * Fetches one record from the file. The record is returned as a Java
+ * object.
+ */
+ @Override
+ public OneRow readNextObject() throws IOException {
+ // if there is one more record in the current split
+ if (!reader.next(key, data)) {
+ // the current split is exhausted. try to move to the next split
+ if (getNextSplit()) {
+ // read the first record of the new split
+ if (!reader.next(key, data)) {
+ // make sure we return nulls
+ return null;
+ }
+ } else {
+ // make sure we return nulls
+ return null;
+ }
+ }
+
+ /*
+ * if neither condition was met, it means we already read all the
+ * records in all the splits, and in this call record variable was not
+ * set, so we return null and thus we are signaling end of records
+ * sequence
+ */
+ return new OneRow(key, data);
+ }
+
+ /**
+ * When user finished reading the file, it closes the RecordReader
+ */
+ @Override
+ public void closeForRead() throws Exception {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+ @Override
+ public boolean isThreadSafe() {
+ return HdfsUtilities.isThreadSafe(inputData.getDataSource(),
+ inputData.getUserProperty("COMPRESSION_CODEC"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/LineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/LineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/LineBreakAccessor.java
new file mode 100644
index 0000000..2727a9b
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/LineBreakAccessor.java
@@ -0,0 +1,128 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.*;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * A PXF Accessor for reading delimited plain text records.
+ */
+public class LineBreakAccessor extends HdfsSplittableDataAccessor implements
+ WriteAccessor {
+ private DataOutputStream dos;
+ private FSDataOutputStream fsdos;
+ private Configuration conf;
+ private FileSystem fs;
+ private Path file;
+ private static Log Log = LogFactory.getLog(LineBreakAccessor.class);
+
+ /**
+ * Constructs a LineReaderAccessor.
+ *
+ * @param input all input parameters coming from the client request
+ */
+ public LineBreakAccessor(InputData input) {
+ super(input, new TextInputFormat());
+ ((TextInputFormat) inputFormat).configure(jobConf);
+ }
+
+ @Override
+ protected Object getReader(JobConf jobConf, InputSplit split)
+ throws IOException {
+ return new ChunkRecordReader(jobConf, (FileSplit) split);
+ }
+
+ /**
+ * Opens file for write.
+ */
+ @Override
+ public boolean openForWrite() throws Exception {
+
+ String fileName = inputData.getDataSource();
+ String compressCodec = inputData.getUserProperty("COMPRESSION_CODEC");
+ CompressionCodec codec = null;
+
+ conf = new Configuration();
+ fs = FileSystem.get(conf);
+
+ // get compression codec
+ if (compressCodec != null) {
+ codec = HdfsUtilities.getCodec(conf, compressCodec);
+ String extension = codec.getDefaultExtension();
+ fileName += extension;
+ }
+
+ file = new Path(fileName);
+
+ if (fs.exists(file)) {
+ throw new IOException("file " + file.toString()
+ + " already exists, can't write data");
+ }
+ org.apache.hadoop.fs.Path parent = file.getParent();
+ if (!fs.exists(parent)) {
+ fs.mkdirs(parent);
+ Log.debug("Created new dir " + parent.toString());
+ }
+
+ // create output stream - do not allow overwriting existing file
+ createOutputStream(file, codec);
+
+ return true;
+ }
+
+ /*
+ * Creates output stream from given file. If compression codec is provided,
+ * wrap it around stream.
+ */
+ private void createOutputStream(Path file, CompressionCodec codec)
+ throws IOException {
+ fsdos = fs.create(file, false);
+ if (codec != null) {
+ dos = new DataOutputStream(codec.createOutputStream(fsdos));
+ } else {
+ dos = fsdos;
+ }
+
+ }
+
+ /**
+ * Writes row into stream.
+ */
+ @Override
+ public boolean writeNextObject(OneRow onerow) throws Exception {
+ dos.write((byte[]) onerow.getData());
+ return true;
+ }
+
+ /**
+ * Closes the output stream after done writing.
+ */
+ @Override
+ public void closeForWrite() throws Exception {
+ if ((dos != null) && (fsdos != null)) {
+ Log.debug("Closing writing stream for path " + file);
+ dos.flush();
+ /*
+ * From release 0.21.0 sync() is deprecated in favor of hflush(),
+ * which only guarantees that new readers will see all data written
+ * to that point, and hsync(), which makes a stronger guarantee that
+ * the operating system has flushed the data to disk (like POSIX
+ * fsync), although data may still be in the disk cache.
+ */
+ fsdos.hsync();
+ dos.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/QuotedLineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
new file mode 100644
index 0000000..249a90d
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
@@ -0,0 +1,52 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * A (atomic) PXF Accessor for reading \n delimited files with quoted
+ * field delimiter, line delimiter, and quotes. This accessor supports
+ * multi-line records, that are read from a single source (non-parallel).
+ */
+public class QuotedLineBreakAccessor extends HdfsAtomicDataAccessor {
+ private BufferedReader reader;
+
+ /**
+ * Constructs a QuotedLineBreakAccessor.
+ *
+ * @param input all input parameters coming from the client request
+ */
+ public QuotedLineBreakAccessor(InputData input) {
+ super(input);
+ }
+
+ @Override
+ public boolean openForRead() throws Exception {
+ if (!super.openForRead()) {
+ return false;
+ }
+ reader = new BufferedReader(new InputStreamReader(inp));
+ return true;
+ }
+
+ /**
+ * Fetches one record (maybe partial) from the file. The record is returned as a Java object.
+ */
+ @Override
+ public OneRow readNextObject() throws IOException {
+ if (super.readNextObject() == null) /* check if working segment */ {
+ return null;
+ }
+
+ String next_line = reader.readLine();
+ if (next_line == null) /* EOF */ {
+ return null;
+ }
+
+ return new OneRow(null, next_line);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessor.java
new file mode 100644
index 0000000..9ef4cb9
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessor.java
@@ -0,0 +1,215 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+/**
+ * A PXF Accessor for reading and writing Sequence File records
+ */
+public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
+ WriteAccessor {
+
+ private Configuration conf;
+ private FileContext fc;
+ private Path file;
+ private CompressionCodec codec;
+ private CompressionType compressionType;
+ private SequenceFile.Writer writer;
+ private LongWritable defaultKey; // used when recordkey is not defined
+
+ private static Log Log = LogFactory.getLog(SequenceFileAccessor.class);;
+
+ /**
+ * Constructs a SequenceFileAccessor.
+ *
+ * @param input all input parameters coming from the client request
+ */
+ public SequenceFileAccessor(InputData input) {
+ super(input, new SequenceFileInputFormat<Writable, Writable>());
+ }
+
+ /**
+ * Overrides virtual method to create specialized record reader
+ */
+ @Override
+ protected Object getReader(JobConf jobConf, InputSplit split)
+ throws IOException {
+ return new SequenceFileRecordReader<Object, Object>(jobConf, (FileSplit) split);
+ }
+
+ @Override
+ public boolean openForWrite() throws Exception {
+ FileSystem fs;
+ Path parent;
+ String fileName = inputData.getDataSource();
+ conf = new Configuration();
+
+ getCompressionCodec(inputData);
+ fileName = updateFileExtension(fileName, codec);
+
+ // construct the output stream
+ file = new Path(fileName);
+ fs = file.getFileSystem(conf);
+ fc = FileContext.getFileContext();
+ defaultKey = new LongWritable(inputData.getSegmentId());
+
+ if (fs.exists(file)) {
+ throw new IOException("file " + file
+ + " already exists, can't write data");
+ }
+ parent = file.getParent();
+ if (!fs.exists(parent)) {
+ fs.mkdirs(parent);
+ Log.debug("Created new dir " + parent);
+ }
+
+ writer = null;
+ return true;
+ }
+
+ /**
+ * Compression: based on compression codec and compression type (default
+ * value RECORD). If there is no codec, compression type is ignored, and
+ * NONE is used.
+ *
+ * @param inputData - container where compression codec and type are held
+ */
+ private void getCompressionCodec(InputData inputData) {
+
+ String userCompressCodec = inputData.getUserProperty("COMPRESSION_CODEC");
+ String userCompressType = inputData.getUserProperty("COMPRESSION_TYPE");
+ String parsedCompressType = parseCompressionType(userCompressType);
+
+ compressionType = SequenceFile.CompressionType.NONE;
+ codec = null;
+ if (userCompressCodec != null) {
+ codec = HdfsUtilities.getCodec(conf, userCompressCodec);
+
+ try {
+ compressionType = CompressionType.valueOf(parsedCompressType);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "Illegal value for compression type " + "'"
+ + parsedCompressType + "'");
+ }
+ if (compressionType == null) {
+ throw new IllegalArgumentException(
+ "Compression type must be defined");
+ }
+
+ Log.debug("Compression ON: " + "compression codec: "
+ + userCompressCodec + ", compression type: "
+ + compressionType);
+ }
+ }
+
+ /*
+ * Parses compression type for sequence file. If null, default to RECORD.
+ * Allowed values: RECORD, BLOCK.
+ */
+ private String parseCompressionType(String compressType) {
+ final String COMPRESSION_TYPE_RECORD = "RECORD";
+ final String COMPRESSION_TYPE_BLOCK = "BLOCK";
+ final String COMPRESSION_TYPE_NONE = "NONE";
+
+ if (compressType == null) {
+ return COMPRESSION_TYPE_RECORD;
+ }
+
+ if (compressType.equalsIgnoreCase(COMPRESSION_TYPE_NONE)) {
+ throw new IllegalArgumentException(
+ "Illegal compression type 'NONE'. "
+ + "For disabling compression remove COMPRESSION_CODEC parameter.");
+ }
+
+ if (!compressType.equalsIgnoreCase(COMPRESSION_TYPE_RECORD)
+ && !compressType.equalsIgnoreCase(COMPRESSION_TYPE_BLOCK)) {
+ throw new IllegalArgumentException("Illegal compression type '"
+ + compressType + "'");
+ }
+
+ return compressType.toUpperCase();
+ }
+
+ /*
+ * Returns fileName with the codec's file extension appended
+ */
+ private String updateFileExtension(String fileName, CompressionCodec codec) {
+
+ if (codec != null) {
+ fileName += codec.getDefaultExtension();
+ }
+ Log.debug("File name for write: " + fileName);
+ return fileName;
+ }
+
+ @Override
+ public boolean writeNextObject(OneRow onerow) throws IOException {
+ Writable value = (Writable) onerow.getData();
+ Writable key = (Writable) onerow.getKey();
+
+ // init writer on first approach here, based on onerow.getData type
+ // TODO: verify data is serializable.
+ if (writer == null) {
+ Class<? extends Writable> valueClass = value.getClass();
+ Class<? extends Writable> keyClass = (key == null) ? LongWritable.class
+ : key.getClass();
+ // create writer - do not allow overwriting existing file
+ writer = SequenceFile.createWriter(fc, conf, file, keyClass,
+ valueClass, compressionType, codec,
+ new SequenceFile.Metadata(), EnumSet.of(CreateFlag.CREATE));
+ }
+
+ try {
+ writer.append((key == null) ? defaultKey : key, value);
+ } catch (IOException e) {
+ Log.error("Failed to write data to file: " + e.getMessage());
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void closeForWrite() throws Exception {
+ if (writer != null) {
+ writer.sync();
+ /*
+ * From release 0.21.0 sync() is deprecated in favor of hflush(),
+ * which only guarantees that new readers will see all data written
+ * to that point, and hsync(), which makes a stronger guarantee that
+ * the operating system has flushed the data to disk (like POSIX
+ * fsync), although data may still be in the disk cache.
+ */
+ writer.hsync();
+ writer.close();
+ }
+ }
+
+ public CompressionType getCompressionType() {
+ return compressionType;
+ }
+
+ public CompressionCodec getCodec() {
+ return codec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
new file mode 100644
index 0000000..efce79f
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
@@ -0,0 +1,74 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.WriteResolver;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.hawq.pxf.api.io.DataType.VARCHAR;
+
+/**
+ * StringPassResolver handles "deserialization" and serialization of
+ * String records. StringPassResolver implements IReadResolver and
+ * IWriteResolver interfaces. Returns strings as-is.
+ */
+public class StringPassResolver extends Plugin implements ReadResolver, WriteResolver {
+ // for write
+ private OneRow oneRow;
+
+ /**
+ * Constructs a StringPassResolver.
+ *
+ * @param inputData input all input parameters coming from the client request
+ */
+ public StringPassResolver(InputData inputData) {
+ super(inputData);
+ oneRow = new OneRow();
+ this.inputData = inputData;
+ }
+
+ /**
+ * Returns a list of the fields of one record.
+ * Each record field is represented by a {@link OneField} item.
+ * OneField item contains two fields: an integer representing the field type and a Java
+ * Object representing the field value.
+ */
+ @Override
+ public List<OneField> getFields(OneRow onerow) {
+ /*
+ * This call forces a whole text line into a single varchar field and replaces
+ * the proper field separation code can be found in previous revisions. The reasons
+ * for doing so as this point are:
+ * 1. performance
+ * 2. desire to not replicate text parsing logic from the backend into java
+ */
+ List<OneField> record = new LinkedList<OneField>();
+ Object data = onerow.getData();
+ if (data instanceof ChunkWritable) {
+ record.add(new OneField(DataType.BYTEA.getOID(), ((ChunkWritable)data).box));
+ }
+ else {
+ record.add(new OneField(VARCHAR.getOID(), data));
+ }
+ return record;
+ }
+
+ /**
+ * Creates a OneRow object from the singleton list.
+ */
+ @Override
+ public OneRow setFields(List<OneField> record) throws Exception {
+ if (((byte[]) record.get(0).val).length == 0) {
+ return null;
+ }
+
+ oneRow.setData(record.get(0).val);
+ return oneRow;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/WritableResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/WritableResolver.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/WritableResolver.java
new file mode 100644
index 0000000..fa8da82
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/WritableResolver.java
@@ -0,0 +1,220 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.*;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.RecordkeyAdapter;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.DataSchemaException;
+import org.apache.hawq.pxf.service.utilities.Utilities;
+
+import static org.apache.hawq.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.hawq.pxf.api.io.DataType.*;
+
+/**
+ * WritableResolver handles serialization and deserialization of records
+ * that were serialized using Hadoop's Writable serialization framework.
+ *
+ * A field named 'recordkey' is treated as a key of the given row, and not as
+ * part of the data schema. See {@link RecordkeyAdapter}.
+ */
+public class WritableResolver extends Plugin implements ReadResolver, WriteResolver {
+ private static final int RECORDKEY_UNDEFINED = -1;
+ private static final Log LOG = LogFactory.getLog(WritableResolver.class);
+ private RecordkeyAdapter recordkeyAdapter = new RecordkeyAdapter();
+ private int recordkeyIndex;
+ // reflection fields
+ private Object userObject = null;
+ private Field[] fields = null;
+
+
+ /**
+ * Constructs a WritableResolver.
+ *
+ * @param input all input parameters coming from the client
+ * @throws Exception if schema file is missing, cannot be found in
+ * classpath or fails to instantiate
+ */
+ public WritableResolver(InputData input) throws Exception {
+ super(input);
+
+ String schemaName = inputData.getUserProperty("DATA-SCHEMA");
+
+ /** Testing that the schema name was supplied by the user - schema is an optional property. */
+ if (schemaName == null) {
+ throw new DataSchemaException(SCHEMA_NOT_INDICATED, this.getClass().getName());
+ }
+
+ /** Testing that the schema resource exists. */
+ if (!isSchemaOnClasspath(schemaName)) {
+ throw new DataSchemaException(SCHEMA_NOT_ON_CLASSPATH, schemaName);
+ }
+
+ userObject = Utilities.createAnyInstance(schemaName);
+ fields = userObject.getClass().getDeclaredFields();
+ recordkeyIndex = (inputData.getRecordkeyColumn() == null)
+ ? RECORDKEY_UNDEFINED
+ : inputData.getRecordkeyColumn().columnIndex();
+
+ // fields details:
+ if (LOG.isDebugEnabled()) {
+ for (int i = 0; i < fields.length; i++) {
+ Field field = fields[i];
+ String javaType = field.getType().getName();
+ boolean isPrivate = Modifier.isPrivate(field.getModifiers());
+
+ LOG.debug("Field #" + i + ", name: " + field.getName() +
+ " type: " + javaType + ", " +
+ (isArray(javaType) ? "Array" : "Primitive") + ", " +
+ (isPrivate ? "Private" : "accessible") + " field");
+ }
+ }
+ }
+
+ private boolean isArray(String javaType) {
+ return (javaType.startsWith("[") && !"[B".equals(javaType));
+ }
+
+ @Override
+ public List<OneField> getFields(OneRow onerow) throws Exception {
+ userObject = onerow.getData();
+ List<OneField> record = new LinkedList<OneField>();
+
+ int currentIdx = 0;
+ for (Field field : fields) {
+ if (currentIdx == recordkeyIndex) {
+ currentIdx += recordkeyAdapter.appendRecordkeyField(record, inputData, onerow);
+ }
+
+ if (Modifier.isPrivate(field.getModifiers())) {
+ continue;
+ }
+
+ currentIdx += populateRecord(record, field);
+ }
+
+ return record;
+ }
+
+ int setArrayField(List<OneField> record, int dataType, Field reflectedField) throws IllegalAccessException {
+ Object array = reflectedField.get(userObject);
+ int length = Array.getLength(array);
+ for (int j = 0; j < length; j++) {
+ record.add(new OneField(dataType, Array.get(array, j)));
+ }
+ return length;
+ }
+
+ /*
+ * Given a java Object type, convert it to the corresponding output field
+ * type.
+ */
+ private DataType convertJavaToGPDBType(String type) {
+ if ("boolean".equals(type) || "[Z".equals(type)) {
+ return BOOLEAN;
+ }
+ if ("int".equals(type) || "[I".equals(type)) {
+ return INTEGER;
+ }
+ if ("double".equals(type) || "[D".equals(type)) {
+ return FLOAT8;
+ }
+ if ("java.lang.String".equals(type) || "[Ljava.lang.String;".equals(type)) {
+ return TEXT;
+ }
+ if ("float".equals(type) || "[F".equals(type)) {
+ return REAL;
+ }
+ if ("long".equals(type) || "[J".equals(type)) {
+ return BIGINT;
+ }
+ if ("[B".equals(type)) {
+ return BYTEA;
+ }
+ if ("short".equals(type) || "[S".equals(type)) {
+ return SMALLINT;
+ }
+ throw new UnsupportedTypeException("Type " + type + " is not supported by GPDBWritable");
+ }
+
+ int populateRecord(List<OneField> record, Field field) throws BadRecordException {
+ String javaType = field.getType().getName();
+ try {
+ DataType dataType = convertJavaToGPDBType(javaType);
+ if (isArray(javaType)) {
+ return setArrayField(record, dataType.getOID(), field);
+ }
+ record.add(new OneField(dataType.getOID(), field.get(userObject)));
+ return 1;
+ } catch (IllegalAccessException ex) {
+ throw new BadRecordException(ex);
+ }
+ }
+
+ /**
+ * Sets customWritable fields and creates a OneRow object.
+ */
+ @Override
+ public OneRow setFields(List<OneField> record) throws Exception {
+ Writable key = null;
+
+ int colIdx = 0;
+ for (Field field : fields) {
+ /*
+ * extract recordkey based on the column descriptor type
+ * and add to OneRow.key
+ */
+ if (colIdx == recordkeyIndex) {
+ key = recordkeyAdapter.convertKeyValue(record.get(colIdx).val);
+ colIdx++;
+ }
+
+ if (Modifier.isPrivate(field.getModifiers())) {
+ continue;
+ }
+
+ String javaType = field.getType().getName();
+ convertJavaToGPDBType(javaType);
+ if (isArray(javaType)) {
+ Object value = field.get(userObject);
+ int length = Array.getLength(value);
+ for (int j = 0; j < length; j++, colIdx++) {
+ Array.set(value, j, record.get(colIdx).val);
+ }
+ } else {
+ field.set(userObject, record.get(colIdx).val);
+ colIdx++;
+ }
+ }
+
+ return new OneRow(key, userObject);
+ }
+
+ /*
+ * Tests for the case schema resource is a file like avro_schema.avsc
+ * or for the case schema resource is a Java class. in which case we try to reflect the class name.
+ */
+ private boolean isSchemaOnClasspath(String resource) {
+ if (this.getClass().getClassLoader().getResource("/" + resource) != null) {
+ return true;
+ }
+
+ try {
+ Class.forName(resource);
+ return true;
+ } catch (ClassNotFoundException e) {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/DataSchemaException.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/DataSchemaException.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/DataSchemaException.java
new file mode 100644
index 0000000..6c7ece0
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/DataSchemaException.java
@@ -0,0 +1,43 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+/**
+ * Thrown when there is a data schema problem detected by any plugin that
+ * requires a schema.
+ * {@link DataSchemaException.MessageFmt#SCHEMA_NOT_ON_CLASSPATH} when the specified schema is missing from the CLASSPATH.
+ * {@link DataSchemaException.MessageFmt#SCHEMA_NOT_INDICATED} when a schema was required but was not specified in the pxf uri.
+ */
+public class DataSchemaException extends RuntimeException {
+ public static enum MessageFmt {
+ SCHEMA_NOT_INDICATED("%s requires a data schema to be specified in the "+
+ "pxf uri, but none was found. Please supply it" +
+ "using the DATA-SCHEMA option "),
+ SCHEMA_NOT_ON_CLASSPATH("schema resource \"%s\" is not located on the classpath");
+
+ String format;
+
+ MessageFmt(String format) {
+ this.format = format;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+ }
+
+ private MessageFmt msgFormat;
+
+ /**
+ * Constructs a DataSchemaException.
+ *
+ * @param msgFormat the message format
+ * @param msgArgs the message arguments
+ */
+ public DataSchemaException(MessageFmt msgFormat, String... msgArgs) {
+ super(String.format(msgFormat.getFormat(), (Object[]) msgArgs));
+ this.msgFormat = msgFormat;
+ }
+
+ public MessageFmt getMsgFormat() {
+ return msgFormat;
+ }
+}
\ No newline at end of file