You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2015/11/03 01:36:13 UTC

[10/15] incubator-hawq git commit: HAWQ-45. PXF package namespace refactor

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroResolver.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroResolver.java
new file mode 100644
index 0000000..317040f
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroResolver.java
@@ -0,0 +1,389 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.DataSchemaException;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.RecordkeyAdapter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hawq.pxf.api.io.DataType.*;
+import static org.apache.hawq.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.SCHEMA_NOT_INDICATED;
+import static org.apache.hawq.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.SCHEMA_NOT_ON_CLASSPATH;
+import static org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities.getAvroSchema;
+
+/**
+ * Class AvroResolver handles deserialization of records that were serialized
+ * using the AVRO serialization framework.
+ */
+public class AvroResolver extends Plugin implements ReadResolver {
+    private GenericRecord avroRecord = null;
+    private DatumReader<GenericRecord> reader = null;
+    // member kept to enable reuse, and thus avoid repeated allocation
+    private BinaryDecoder decoder = null;
+    private List<Schema.Field> fields = null;
+    private RecordkeyAdapter recordkeyAdapter = new RecordkeyAdapter();
+    private static final String MAPKEY_DELIM = ":";
+    private static final String RECORDKEY_DELIM = ":";
+    private static final String COLLECTION_DELIM = ",";
+    private String collectionDelim;
+    private String mapkeyDelim;
+    private String recordkeyDelim;
+
+    /**
+     * Constructs an AvroResolver. Initializes Avro data structure: the Avro
+     * record - fields information and the Avro record reader. All Avro data is
+     * build from the Avro schema, which is based on the *.avsc file that was
+     * passed by the user
+     *
+     * @param input all input parameters coming from the client
+     * @throws IOException if Avro schema could not be retrieved or parsed
+     */
+    public AvroResolver(InputData input) throws IOException {
+        super(input);
+
+        Schema schema = isAvroFile() ? getAvroSchema(new Configuration(),
+                input.getDataSource())
+                : (new Schema.Parser()).parse(openExternalSchema());
+
+        reader = new GenericDatumReader<>(schema);
+        fields = schema.getFields();
+
+        collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
+                : input.getUserProperty("COLLECTION_DELIM");
+        mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
+                : input.getUserProperty("MAPKEY_DELIM");
+        recordkeyDelim = input.getUserProperty("RECORDKEY_DELIM") == null ? RECORDKEY_DELIM
+                : input.getUserProperty("RECORDKEY_DELIM");
+    }
+
+    /**
+     * Returns a list of the fields of one record. Each record field is
+     * represented by a OneField item. OneField item contains two fields: an
+     * integer representing the field type and a Java Object representing the
+     * field value.
+     */
+    @Override
+    public List<OneField> getFields(OneRow row) throws Exception {
+        avroRecord = makeAvroRecord(row.getData(), avroRecord);
+        List<OneField> record = new LinkedList<OneField>();
+
+        int recordkeyIndex = (inputData.getRecordkeyColumn() == null) ? -1
+                : inputData.getRecordkeyColumn().columnIndex();
+        int currentIndex = 0;
+
+        for (Schema.Field field : fields) {
+            /*
+             * Add the record key if exists
+             */
+            if (currentIndex == recordkeyIndex) {
+                currentIndex += recordkeyAdapter.appendRecordkeyField(record,
+                        inputData, row);
+            }
+
+            currentIndex += populateRecord(record,
+                    avroRecord.get(field.name()), field.schema());
+        }
+
+        return record;
+    }
+
+    /**
+     * Tests if the Avro records are residing inside an AVRO file. If the Avro
+     * records are not residing inside an AVRO file, then they may reside inside
+     * a sequence file, regular file, ...
+     *
+     * @return whether the resource is an Avro file
+     */
+    boolean isAvroFile() {
+        return inputData.getAccessor().toLowerCase().contains("avro");
+    }
+
+    /**
+     * The record can arrive from one out of two different sources: a sequence
+     * file or an AVRO file. If it comes from an AVRO file, then it was already
+     * obtained as a {@link GenericRecord} when when it was fetched from the
+     * file with the {@link AvroRecorReader} so in this case a cast is enough.
+     * On the other hand, if the source is a sequence file, then the input
+     * parameter obj hides a bytes [] buffer which is in fact one Avro record
+     * serialized. Here, we build the Avro record from the flat buffer, using
+     * the AVRO API. Then (for both cases) in the remaining functions we build a
+     * {@code List<OneField>} record from the Avro record.
+     *
+     * @param obj object holding an Avro record
+     * @param reuseRecord Avro record to be reused to create new record from obj
+     * @return Avro record
+     * @throws IOException if creating the Avro record from byte array failed
+     */
+    GenericRecord makeAvroRecord(Object obj, GenericRecord reuseRecord)
+            throws IOException {
+        if (isAvroFile()) {
+            return (GenericRecord) obj;
+        } else {
+            byte[] bytes = ((BytesWritable) obj).getBytes();
+            decoder = DecoderFactory.get().binaryDecoder(bytes, decoder);
+            return reader.read(reuseRecord, decoder);
+        }
+    }
+
+    /**
+     * For a given field in the Avro record we extract its value and insert it
+     * into the output {@code List<OneField>} record. An Avro field can be a
+     * primitive type or an array type.
+     *
+     * @param record list of fields to be populated
+     * @param fieldValue field value
+     * @param fieldSchema field schema
+     * @return the number of populated fields
+     */
+    int populateRecord(List<OneField> record, Object fieldValue,
+                       Schema fieldSchema) {
+
+        Schema.Type fieldType = fieldSchema.getType();
+        int ret = 0;
+        Object value = fieldValue;
+
+        switch (fieldType) {
+            case ARRAY:
+                if(fieldValue == null) {
+                    return addOneFieldToRecord(record, TEXT, fieldValue);
+                }
+                List<OneField> listRecord = new LinkedList<>();
+                ret = setArrayField(listRecord, fieldValue, fieldSchema);
+                addOneFieldToRecord(record, TEXT, String.format("[%s]",
+                        HdfsUtilities.toString(listRecord, collectionDelim)));
+                break;
+            case MAP:
+                if(fieldValue == null) {
+                    return addOneFieldToRecord(record, TEXT, fieldValue);
+                }
+                List<OneField> mapRecord = new LinkedList<>();
+                ret = setMapField(mapRecord, fieldValue, fieldSchema);
+                addOneFieldToRecord(record, TEXT, String.format("{%s}",
+                        HdfsUtilities.toString(mapRecord, collectionDelim)));
+                break;
+            case RECORD:
+                if(fieldValue == null) {
+                    return addOneFieldToRecord(record, TEXT, fieldValue);
+                }
+                List<OneField> recRecord = new LinkedList<>();
+                ret = setRecordField(recRecord, fieldValue, fieldSchema);
+                addOneFieldToRecord(record, TEXT, String.format("{%s}",
+                        HdfsUtilities.toString(recRecord, collectionDelim)));
+                break;
+            case UNION:
+                /*
+                 * When an Avro field is actually a union, we resolve the type
+                 * of the union element, and delegate the record update via
+                 * recursion
+                 */
+                int unionIndex = GenericData.get().resolveUnion(fieldSchema,
+                        fieldValue);
+                /**
+                 * Retrieve index of the non null data type from the type array
+                 * if value is null
+                 */
+                if (fieldValue == null) {
+                    unionIndex ^= 1;
+                }
+                ret = populateRecord(record, fieldValue,
+                        fieldSchema.getTypes().get(unionIndex));
+                break;
+            case ENUM:
+                ret = addOneFieldToRecord(record, TEXT, value);
+                break;
+            case INT:
+                ret = addOneFieldToRecord(record, INTEGER, value);
+                break;
+            case DOUBLE:
+                ret = addOneFieldToRecord(record, FLOAT8, value);
+                break;
+            case STRING:
+                value = (fieldValue != null) ? String.format("%s", fieldValue)
+                        : null;
+                ret = addOneFieldToRecord(record, TEXT, value);
+                break;
+            case FLOAT:
+                ret = addOneFieldToRecord(record, REAL, value);
+                break;
+            case LONG:
+                ret = addOneFieldToRecord(record, BIGINT, value);
+                break;
+            case BYTES:
+                ret = addOneFieldToRecord(record, BYTEA, value);
+                break;
+            case BOOLEAN:
+                ret = addOneFieldToRecord(record, BOOLEAN, value);
+                break;
+            case FIXED:
+                ret = addOneFieldToRecord(record, BYTEA, value);
+                break;
+            default:
+                break;
+        }
+        return ret;
+    }
+
+    /**
+     * When an Avro field is actually a record, we iterate through each field
+     * for each entry, the field name and value are added to a local record
+     * {@code List<OneField>} complexRecord with the necessary delimiter we
+     * create an object of type OneField and insert it into the output
+     * {@code List<OneField>} record.
+     *
+     * @param record list of fields to be populated
+     * @param value field value
+     * @param recSchema record schema
+     * @return number of populated fields
+     */
+    int setRecordField(List<OneField> record, Object value, Schema recSchema) {
+
+        GenericRecord rec = ((GenericData.Record) value);
+        Schema fieldKeySchema = Schema.create(Schema.Type.STRING);
+        int currentIndex = 0;
+        for (Schema.Field field : recSchema.getFields()) {
+            Schema fieldSchema = field.schema();
+            Object fieldValue = rec.get(field.name());
+            List<OneField> complexRecord = new LinkedList<>();
+            populateRecord(complexRecord, field.name(), fieldKeySchema);
+            populateRecord(complexRecord, fieldValue, fieldSchema);
+            addOneFieldToRecord(record, TEXT,
+                    HdfsUtilities.toString(complexRecord, recordkeyDelim));
+            currentIndex++;
+        }
+        return currentIndex;
+    }
+
+    /**
+     * When an Avro field is actually a map, we resolve the type of the map
+     * value For each entry, the field name and value are added to a local
+     * record we create an object of type OneField and insert it into the output
+     * {@code List<OneField>} record.
+     *
+     * Unchecked warning is suppressed to enable us to cast fieldValue to a Map.
+     * (since the value schema has been identified to me of type map)
+     *
+     * @param record list of fields to be populated
+     * @param fieldValue field value
+     * @param mapSchema map schema
+     * @return number of populated fields
+     */
+    @SuppressWarnings("unchecked")
+    int setMapField(List<OneField> record, Object fieldValue, Schema mapSchema) {
+        Schema keySchema = Schema.create(Schema.Type.STRING);
+        Schema valueSchema = mapSchema.getValueType();
+        Map<String, ?> avroMap = ((Map<String, ?>) fieldValue);
+        for (Map.Entry<String, ?> entry : avroMap.entrySet()) {
+            List<OneField> complexRecord = new LinkedList<>();
+            populateRecord(complexRecord, entry.getKey(), keySchema);
+            populateRecord(complexRecord, entry.getValue(), valueSchema);
+            addOneFieldToRecord(record, TEXT,
+                    HdfsUtilities.toString(complexRecord, mapkeyDelim));
+        }
+        return avroMap.size();
+    }
+
+    /**
+     * When an Avro field is actually an array, we resolve the type of the array
+     * element, and for each element in the Avro array, we recursively invoke
+     * the population of {@code List<OneField>} record.
+     *
+     * @param record list of fields to be populated
+     * @param fieldValue field value
+     * @param arraySchema array schema
+     * @return number of populated fields
+     */
+    int setArrayField(List<OneField> record, Object fieldValue,
+                      Schema arraySchema) {
+        Schema typeSchema = arraySchema.getElementType();
+        GenericData.Array<?> array = (GenericData.Array<?>) fieldValue;
+        int length = array.size();
+        for (int i = 0; i < length; i++) {
+            populateRecord(record, array.get(i), typeSchema);
+        }
+        return length;
+    }
+
+    /**
+     * Creates the {@link OneField} object and adds it to the output {@code List<OneField>}
+     * record. Strings and byte arrays are held inside special types in the Avro
+     * record so we transfer them to standard types in order to enable their
+     * insertion in the GPDBWritable instance.
+     *
+     * @param record list of fields to be populated
+     * @param gpdbWritableType field type
+     * @param val field value
+     * @return 1 (number of populated fields)
+     */
+    int addOneFieldToRecord(List<OneField> record, DataType gpdbWritableType,
+                            Object val) {
+        OneField oneField = new OneField();
+        oneField.type = gpdbWritableType.getOID();
+        switch (gpdbWritableType) {
+            case BYTEA:
+                if (val instanceof ByteBuffer) {
+                    oneField.val = ((ByteBuffer) val).array();
+                } else {
+                    /**
+                     * Entry point when the underlying bytearray is from a Fixed
+                     * data
+                     */
+                    oneField.val = ((GenericData.Fixed) val).bytes();
+                }
+                break;
+            default:
+                oneField.val = val;
+                break;
+        }
+
+        record.add(oneField);
+        return 1;
+    }
+
+    /**
+     * Opens Avro schema based on DATA-SCHEMA parameter.
+     *
+     * @return InputStream of schema file
+     * @throws DataSchemaException if schema file could not be opened
+     */
+    InputStream openExternalSchema() {
+
+        String schemaName = inputData.getUserProperty("DATA-SCHEMA");
+
+        /**
+         * Testing that the schema name was supplied by the user - schema is an
+         * optional properly.
+         */
+        if (schemaName == null) {
+            throw new DataSchemaException(SCHEMA_NOT_INDICATED,
+                    this.getClass().getName());
+        }
+
+        /** Testing that the schema resource exists. */
+        if (this.getClass().getClassLoader().getResource(schemaName) == null) {
+            throw new DataSchemaException(SCHEMA_NOT_ON_CLASSPATH, schemaName);
+        }
+        ClassLoader loader = this.getClass().getClassLoader();
+        return loader.getResourceAsStream(schemaName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReader.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReader.java
new file mode 100644
index 0000000..6c8d54c
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReader.java
@@ -0,0 +1,175 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A class that provides a line reader from an input stream. Lines are
+ * terminated by '\n' (LF) EOF also terminates an otherwise unterminated line.
+ */
+public class ChunkReader implements Closeable {
+    public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+    private int bufferSize = DEFAULT_BUFFER_SIZE;
+    private InputStream in;
+    private byte[] buffer;
+    // the number of bytes of real data in the buffer
+    private int bufferLength = 0;
+    // the current position in the buffer
+    private int bufferPosn = 0;
+    private static final byte LF = '\n';
+
+    /**
+     * Constructs a ChunkReader instance
+     *
+     * @param in input stream
+     */
+    public ChunkReader(InputStream in) {
+        this.in = in;
+        this.buffer = new byte[this.bufferSize];
+    }
+
+    /**
+     * Closes the underlying stream.
+     */
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    /*
+     * Internal class used for holding part of a chunk brought by one read()
+     * operation on the input stream. We collect several such nodes in a list by
+     * doing several read operation until we reach the chunk size -
+     * maxBytesToConsume
+     */
+    private class Node {
+        /* part of a chunk brought in a single inputstream.read() operation */
+        public byte[] slice;
+        /* the size of the slice */
+        public int len;
+    }
+
+    /**
+     * Reads data in chunks of DEFAULT_CHUNK_SIZE, until we reach
+     * maxBytesToConsume.
+     *
+     * @param str - output parameter, will contain the read chunk byte array
+     * @param maxBytesToConsume - requested chunk size
+     * @return actual chunk size
+     * @throws IOException if the first byte cannot be read for any reason
+     *         other than the end of the file, if the input stream has been closed,
+     *         or if some other I/O error occurs.
+     */
+    public int readChunk(Writable str, int maxBytesToConsume) throws IOException
+           {
+        ChunkWritable cw = (ChunkWritable) str;
+        List<Node> list = new LinkedList<Node>();
+
+        long bytesConsumed = 0;
+
+        do {
+            if (bufferLength > 0) {
+                int remaining = bufferLength - bufferPosn;
+                Node nd = new Node();
+                nd.slice = new byte[remaining];
+                nd.len = remaining;
+                System.arraycopy(buffer, bufferPosn, nd.slice, 0, nd.len);
+                list.add(nd);
+                bytesConsumed += nd.len;
+            } else {
+                Node nd = new Node();
+                nd.slice = new byte[buffer.length];
+                nd.len = in.read(nd.slice);
+                if (nd.len <= 0) {
+                    break; // EOF
+                }
+                bytesConsumed += nd.len;
+                list.add(nd);
+            }
+
+            bufferLength = bufferPosn = 0;
+
+        } while (bytesConsumed < maxBytesToConsume);
+
+        if (list.size() > 0) {
+            cw.box = new byte[(int) bytesConsumed];
+            int pos = 0;
+            for (int i = 0; i < list.size(); i++) {
+                Node n = list.get(i);
+                System.arraycopy(n.slice, 0, cw.box, pos, n.len);
+                pos += n.len;
+            }
+        }
+
+        return (int) bytesConsumed;
+    }
+
+    /**
+     * Reads a line terminated by LF.
+     *
+     * @param str - output parameter, will contain the read record
+     * @param maxBytesToConsume - the line mustn't exceed this value
+     * @return length of the line read
+     * @throws IOException if the first byte cannot be read for any reason
+     *         other than the end of the file, if the input stream has been closed,
+     *         or if some other I/O error occurs.
+     */
+    public int readLine(Writable str, int maxBytesToConsume) throws IOException {
+        ChunkWritable cw = (ChunkWritable) str;
+        List<Node> list = new LinkedList<Node>();
+
+        boolean newLine = false; // length of terminating newline
+        long bytesConsumed = 0;
+
+        do {
+            int startPosn = bufferPosn; // starting from where we left off the
+                                        // last time
+            if (bufferPosn >= bufferLength) {
+                startPosn = bufferPosn = 0;
+
+                bufferLength = in.read(buffer);
+                if (bufferLength <= 0) {
+                    break; // EOF
+                }
+            }
+
+            for (; bufferPosn < bufferLength; ++bufferPosn) { // search for
+                                                              // newline
+                if (buffer[bufferPosn] == LF) {
+                    newLine = true;
+                    ++bufferPosn; // at next invocation proceed from following
+                                  // byte
+                    break;
+                }
+            }
+
+            int readLength = bufferPosn - startPosn;
+            bytesConsumed += readLength;
+
+            if (readLength > 0) {
+                Node nd = new Node();
+                nd.slice = new byte[readLength];
+                nd.len = readLength;
+                System.arraycopy(buffer, startPosn, nd.slice, 0, nd.len);
+                list.add(nd);
+            }
+        } while (!newLine && bytesConsumed < maxBytesToConsume);
+
+        if (list.size() > 0) {
+            cw.box = new byte[(int) bytesConsumed];
+            int pos = 0;
+            for (int i = 0; i < list.size(); i++) {
+                Node n = list.get(i);
+                System.arraycopy(n.slice, 0, cw.box, pos, n.len);
+                pos += n.len;
+            }
+        }
+
+        return (int) bytesConsumed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkRecordReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkRecordReader.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkRecordReader.java
new file mode 100644
index 0000000..590b89c
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkRecordReader.java
@@ -0,0 +1,281 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
+import static org.apache.hadoop.mapreduce.lib.input.LineRecordReader.MAX_LINE_LENGTH;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * ChunkRecordReader is designed for fast reading of a file split. The idea is
+ * to bring chunks of data instead of single records. The chunks contain many
+ * records and the chunk end is not aligned on a record boundary. The size of
+ * the chunk is a class hardcoded parameter - CHUNK_SIZE. This behaviour sets
+ * this reader apart from the other readers which will fetch one record and stop
+ * when reaching a record delimiter.
+ */
+public class ChunkRecordReader implements
+        RecordReader<LongWritable, ChunkWritable> {
+    private static final Log LOG = LogFactory.getLog(ChunkRecordReader.class.getName());
+
+    private CompressionCodecFactory compressionCodecs = null;
+    private long start;
+    private long pos;
+    private long end;
+    private long fileLength;
+    private ChunkReader in;
+    private FSDataInputStream fileIn;
+    private final Seekable filePosition;
+    private int maxLineLength;
+    private CompressionCodec codec;
+    private Decompressor decompressor;
+    private static final int CHUNK_SIZE = 1024 * 1024;
+
+    /**
+     * Translates the FSDataInputStream into a DFSInputStream.
+     */
+    private DFSInputStream getInputStream() {
+        return (DFSInputStream) (fileIn.getWrappedStream());
+    }
+
+    /**
+     * Returns statistics of the input stream's read operation: total bytes
+     * read, bytes read locally, bytes read in short-circuit (directly from file
+     * descriptor).
+     *
+     * @return an instance of ReadStatistics class
+     */
+    public ReadStatistics getReadStatistics() {
+        return getInputStream().getReadStatistics();
+    }
+
+    /**
+     * Constructs a ChunkRecordReader instance.
+     *
+     * @param job the job configuration
+     * @param split contains the file name, begin byte of the split and the
+     *            bytes length
+     * @throws IOException if an I/O error occurs when accessing the file or
+     *             creating input stream to read from it
+     */
+    public ChunkRecordReader(Configuration job, FileSplit split)
+            throws IOException {
+        maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
+        validateLength(maxLineLength);
+        start = split.getStart();
+        end = start + split.getLength();
+        final Path file = split.getPath();
+        compressionCodecs = new CompressionCodecFactory(job);
+        codec = compressionCodecs.getCodec(file);
+
+        // open the file and seek to the start of the split
+        job.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+        final FileSystem fs = file.getFileSystem(job);
+        fs.setVerifyChecksum(false);
+        fileIn = fs.open(file, ChunkReader.DEFAULT_BUFFER_SIZE);
+        fileLength = getInputStream().getFileLength();
+        if (isCompressedInput()) {
+            decompressor = CodecPool.getDecompressor(codec);
+            if (codec instanceof SplittableCompressionCodec) {
+                final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
+                        fileIn, decompressor, start, end,
+                        SplittableCompressionCodec.READ_MODE.BYBLOCK);
+                in = new ChunkReader(cIn);
+                start = cIn.getAdjustedStart();
+                end = cIn.getAdjustedEnd();
+                filePosition = cIn; // take pos from compressed stream
+            } else {
+                in = new ChunkReader(codec.createInputStream(fileIn,
+                        decompressor));
+                filePosition = fileIn;
+            }
+        } else {
+            fileIn.seek(start);
+            in = new ChunkReader(fileIn);
+            filePosition = fileIn;
+        }
+        /*
+         * If this is not the first split, we always throw away first record
+         * because we always (except the last split) read one extra line in
+         * next() method.
+         */
+        if (start != 0) {
+            start += in.readLine(new ChunkWritable(), maxBytesToConsume(start));
+        }
+        this.pos = start;
+    }
+
+    /**
+     * Used by the client of this class to create the 'key' output parameter for
+     * next() method.
+     *
+     * @return an instance of LongWritable
+     */
+    @Override
+    public LongWritable createKey() {
+        return new LongWritable();
+    }
+
+    /**
+     * Used by the client of this class to create the 'value' output parameter
+     * for next() method.
+     *
+     * @return an instance of ChunkWritable
+     */
+    @Override
+    public ChunkWritable createValue() {
+        return new ChunkWritable();
+    }
+
+    /**
+     * Fetches the next data chunk from the file split. The size of the chunk is
+     * a class hardcoded parameter - CHUNK_SIZE. This behaviour sets this reader
+     * apart from the other readers which will fetch one record and stop when
+     * reaching a record delimiter.
+     *
+     * @param key - output parameter. When method returns will contain the key -
+     *            the number of the start byte of the chunk
+     * @param value - output parameter. When method returns will contain the
+     *            value - the chunk, a byte array inside the ChunkWritable
+     *            instance
+     * @return false - when end of split was reached
+     * @throws IOException if an I/O error occurred while reading the next chunk
+     *             or line
+     */
+    @Override
+    public synchronized boolean next(LongWritable key, ChunkWritable value)
+            throws IOException {
+        /*
+         * Usually a record is spread between the end of current split and the
+         * beginning of next split. So when reading the last record in the split
+         * we usually need to cross over to the next split. This tricky logic is
+         * implemented in ChunkReader.readLine(). In order not to rewrite this
+         * logic we will read the lust chunk in the split with readLine(). For a
+         * split of 120M, reading the last 1M line by line doesn't have a huge
+         * impact. Applying a factor to the last chunk to make sure we start
+         * before the last record.
+         */
+        float factor = 1.5f;
+        int limit = (int) (factor * CHUNK_SIZE);
+        long curPos = getFilePosition();
+        int newSize = 0;
+
+        while (curPos <= end) {
+            key.set(pos);
+
+            if ((end - curPos) > limit) {
+                newSize = in.readChunk(value, CHUNK_SIZE);
+            } else {
+                newSize = in.readLine(value,
+                        Math.max(maxBytesToConsume(pos), maxLineLength));
+            }
+            if (newSize == 0) {
+                break;
+            }
+
+            pos += newSize;
+
+            if (pos == fileLength) { /*
+                                      * in case text file last character is not
+                                      * a linefeed
+                                      */
+                if (value.box[value.box.length - 1] != '\n') {
+                    int newLen = value.box.length + 1;
+                    byte[] tmp = new byte[newLen];
+                    System.arraycopy(value.box, 0, tmp, 0, newLen - 1);
+                    tmp[newLen - 1] = '\n';
+                    value.box = tmp;
+                }
+            }
+
+            return true;
+        }
+        /*
+         * if we got here, either newSize was 0 or curPos is bigger than end
+         */
+
+        return false;
+    }
+
+    /**
+     * Gets the progress within the split.
+     */
+    @Override
+    public synchronized float getProgress() throws IOException {
+        if (start == end) {
+            return 0.0f;
+        } else {
+            return Math.min(1.0f, (getFilePosition() - start)
+                    / (float) (end - start));
+        }
+    }
+
+    /**
+     * Returns the position of the unread tail of the file
+     *
+     * @return pos - start byte of the unread tail of the file
+     */
+    @Override
+    public synchronized long getPos() throws IOException {
+        return pos;
+    }
+
+    /**
+     * Closes the input stream.
+     */
+    @Override
+    public synchronized void close() throws IOException {
+        try {
+            if (in != null) {
+                in.close();
+            }
+        } finally {
+            if (decompressor != null) {
+                CodecPool.returnDecompressor(decompressor);
+            }
+        }
+    }
+
+    private void validateLength(int maxLineLength) {
+        if (maxLineLength <= 0)
+            throw new IllegalArgumentException(
+                    "maxLineLength must be a positive value");
+    }
+
+    private boolean isCompressedInput() {
+        return (codec != null);
+    }
+
+    private int maxBytesToConsume(long pos) {
+        return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(
+                Integer.MAX_VALUE, end - pos);
+    }
+
+    private long getFilePosition() throws IOException {
+        long retVal;
+        if (isCompressedInput() && null != filePosition) {
+            retVal = filePosition.getPos();
+        } else {
+            retVal = pos;
+        }
+        return retVal;
+    }
+} // class ChunkRecordReader

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkWritable.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkWritable.java
new file mode 100644
index 0000000..936c3ef
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/ChunkWritable.java
@@ -0,0 +1,39 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import java.io.DataOutput;
+import java.io.DataInput;
+import java.lang.UnsupportedOperationException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Just an output buffer for the ChunkRecordReader. It must extend Writable
+ * otherwise it will not fit into the next() interface method
+ */
+public class ChunkWritable implements Writable {
+	public byte [] box;
+
+	/**
+     * Serializes the fields of this object to <code>out</code>.
+     *
+     * @param out <code>DataOutput</code> to serialize this object into.
+     * @throws UnsupportedOperationException this function is not supported
+     */
+	@Override
+    public void write(DataOutput out)  {
+		throw new UnsupportedOperationException("ChunkWritable.write() is not implemented");
+    }
+
+    /**
+     * Deserializes the fields of this object from <code>in</code>.
+     * <p>For efficiency, implementations should attempt to re-use storage in the
+     * existing object where possible.</p>
+     *
+     * @param in <code>DataInput</code> to deserialize this object from.
+     * @throws UnsupportedOperationException  this function is not supported
+     */
+	@Override
+    public void readFields(DataInput in)  {
+		throw new UnsupportedOperationException("ChunkWritable.readFields() is not implemented");
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
new file mode 100644
index 0000000..c80244a
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
@@ -0,0 +1,145 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.Analyzer;
+import org.apache.hawq.pxf.api.AnalyzerStats;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.service.ReadBridge;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.PxfInputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * Analyzer class for HDFS data resources
+ *
+ * Given an HDFS data source (a file, directory, or wild card pattern)
+ * return statistics about it (number of blocks, number of tuples, etc.)
+ */
+public class HdfsAnalyzer extends Analyzer {
+    private JobConf jobConf;
+    private FileSystem fs;
+    private Log Log;
+
+    /**
+     * Constructs an HdfsAnalyzer object.
+     *
+     * @param inputData all input parameters coming from the client
+     * @throws IOException if HDFS file system cannot be retrieved
+     */
+    public HdfsAnalyzer(InputData inputData) throws IOException {
+        super(inputData);
+        Log = LogFactory.getLog(HdfsAnalyzer.class);
+
+        jobConf = new JobConf(new Configuration(), HdfsAnalyzer.class);
+        fs = FileSystem.get(jobConf);
+    }
+
+    /**
+     * Collects a number of basic statistics based on an estimate. Statistics
+     * are: number of records, number of hdfs blocks and hdfs block size.
+     *
+     * @param datapath path is a data source URI that can appear as a file
+     *        name, a directory name or a wildcard pattern
+     * @return statistics in JSON format
+     * @throws Exception if path is wrong, its metadata cannot be retrieved
+     *                    from file system, or if scanning the first block
+     *                    using the accessor failed
+     */
+    @Override
+    public AnalyzerStats getEstimatedStats(String datapath) throws Exception {
+        long blockSize = 0;
+        long numberOfBlocks;
+        Path path = new Path(HdfsUtilities.absoluteDataPath(datapath));
+
+        ArrayList<InputSplit> splits = getSplits(path);
+
+        for (InputSplit split : splits) {
+            FileSplit fsp = (FileSplit) split;
+            Path filePath = fsp.getPath();
+            FileStatus fileStatus = fs.getFileStatus(filePath);
+            if (fileStatus.isFile()) {
+                blockSize = fileStatus.getBlockSize();
+                break;
+            }
+        }
+
+        // if no file is in path (only dirs), get default block size
+        if (blockSize == 0) {
+            blockSize = fs.getDefaultBlockSize(path);
+        }
+        numberOfBlocks = splits.size();
+
+
+        long numberOfTuplesInBlock = getNumberOfTuplesInBlock(splits);
+        AnalyzerStats stats = new AnalyzerStats(blockSize, numberOfBlocks, numberOfTuplesInBlock * numberOfBlocks);
+
+        //print files size to log when in debug level
+        Log.debug(AnalyzerStats.dataToString(stats, path.toString()));
+
+        return stats;
+    }
+
+    /**
+     * Calculates the number of tuples in a split (block).
+     * Reads one block from HDFS. Exception during reading will
+     * filter upwards and handled in AnalyzerResource
+     */
+    private long getNumberOfTuplesInBlock(ArrayList<InputSplit> splits) throws Exception {
+        long tuples = -1; /* default  - if we are not able to read data */
+        ReadAccessor accessor;
+
+        if (splits.isEmpty()) {
+            return 0;
+        }
+
+        /*
+         * metadata information includes: file split's
+         * start, length and hosts (locations).
+         */
+        FileSplit firstSplit = (FileSplit) splits.get(0);
+        byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(firstSplit);
+        inputData.setFragmentMetadata(fragmentMetadata);
+        inputData.setDataSource(firstSplit.getPath().toUri().getPath());
+        accessor = ReadBridge.getFileAccessor(inputData);
+
+        if (accessor.openForRead()) {
+            tuples = 0;
+            while (accessor.readNextObject() != null) {
+                tuples++;
+            }
+
+            accessor.closeForRead();
+        }
+
+        return tuples;
+    }
+
+    private ArrayList<InputSplit> getSplits(Path path) throws IOException {
+        PxfInputFormat fformat = new PxfInputFormat();
+        PxfInputFormat.setInputPaths(jobConf, path);
+        InputSplit[] splits = fformat.getSplits(jobConf, 1);
+        ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+
+        // remove empty splits
+        if (splits != null) {
+	        for (InputSplit split : splits) {
+	        	if (split.getLength() > 0) {
+	        		result.add(split);
+	        	}
+	        }
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
new file mode 100644
index 0000000..fb728fa
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
@@ -0,0 +1,112 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+/**
+ * Base class for enforcing the complete access of a file in one accessor.
+ * Since we are not accessing the file using the splittable API, but instead are
+ * using the "simple" stream API, it means that we cannot fetch different parts
+ * (splits) of the file in different segments. Instead each file access brings
+ * the complete file. And, if several segments would access the same file, then
+ * each one will return the whole file and we will observe in the query result,
+ * each record appearing number_of_segments times. To avoid this we will only
+ * have one segment (segment 0) working for this case - enforced with
+ * isWorkingSegment() method. Naturally this is the less recommended working
+ * mode since we are not making use of segment parallelism. HDFS accessors for
+ * a specific file type should inherit from this class only if the file they are
+ * reading does not support splitting: a protocol-buffer file, regular file, ...
+ */
+public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAccessor {
+    private Configuration conf = null;
+    protected InputStream inp = null;
+    private FileSplit fileSplit = null;
+
+    /**
+     * Constructs a HdfsAtomicDataAccessor object.
+     *
+     * @param input all input parameters coming from the client
+     */
+    public HdfsAtomicDataAccessor(InputData input) {
+        // 0. Hold the configuration data
+        super(input);
+
+        // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files
+        conf = new Configuration();
+
+        fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+    }
+
+    /**
+     * Opens the file using the non-splittable API for HADOOP HDFS file access
+     * This means that instead of using a FileInputFormat for access, we use a
+     * Java stream.
+     *
+     * @return true for successful file open, false otherwise
+     */
+    @Override
+    public boolean openForRead() throws Exception {
+        if (!isWorkingSegment()) {
+            return false;
+        }
+
+        // input data stream
+        FileSystem fs = FileSystem.get(URI.create(inputData.getDataSource()), conf); // FileSystem.get actually returns an FSDataInputStream
+        inp = fs.open(new Path(inputData.getDataSource()));
+
+        return (inp != null);
+    }
+
+    /**
+     * Fetches one record from the file.
+     *
+     * @return a {@link OneRow} record as a Java object. Returns null if none.
+     */
+    @Override
+    public OneRow readNextObject() throws IOException {
+        if (!isWorkingSegment()) {
+            return null;
+        }
+
+        return new OneRow(null, new Object());
+    }
+
+    /**
+     * Closes the access stream when finished reading the file
+     */
+    @Override
+    public void closeForRead() throws Exception {
+        if (!isWorkingSegment()) {
+            return;
+        }
+
+        if (inp != null) {
+            inp.close();
+        }
+    }
+
+    /*
+     * Making sure that only the segment that got assigned the first data
+     * fragment will read the (whole) file.
+     */
+    private boolean isWorkingSegment() {
+        return (fileSplit.getStart() == 0);
+    }
+
+    @Override
+    public boolean isThreadSafe() {
+        return HdfsUtilities.isThreadSafe(inputData.getDataSource(),
+        								  inputData.getUserProperty("COMPRESSION_CODEC"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
new file mode 100644
index 0000000..5c81ef8
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
@@ -0,0 +1,79 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.PxfInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Fragmenter class for HDFS data resources.
+ *
+ * Given an HDFS data source (a file, directory, or wild card pattern) divide
+ * the data into fragments and return a list of them along with a list of
+ * host:port locations for each.
+ */
+public class HdfsDataFragmenter extends Fragmenter {
+    private JobConf jobConf;
+
+    /**
+     * Constructs an HdfsDataFragmenter object.
+     *
+     * @param md all input parameters coming from the client
+     */
+    public HdfsDataFragmenter(InputData md) {
+        super(md);
+
+        jobConf = new JobConf(new Configuration(), HdfsDataFragmenter.class);
+    }
+
+    /**
+     * Gets the fragments for a data source URI that can appear as a file name,
+     * a directory name or a wildcard. Returns the data fragments in JSON
+     * format.
+     */
+    @Override
+    public List<Fragment> getFragments() throws Exception {
+        String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+        InputSplit[] splits = getSplits(new Path(absoluteDataPath));
+
+        for (InputSplit split : splits != null ? splits : new InputSplit[] {}) {
+            FileSplit fsp = (FileSplit) split;
+
+            /*
+             * HD-2547: If the file is empty, an empty split is returned: no
+             * locations and no length.
+             */
+            if (fsp.getLength() <= 0) {
+                continue;
+            }
+
+            String filepath = fsp.getPath().toUri().getPath();
+            String[] hosts = fsp.getLocations();
+
+            /*
+             * metadata information includes: file split's start, length and
+             * hosts (locations).
+             */
+            byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(fsp);
+            Fragment fragment = new Fragment(filepath, hosts, fragmentMetadata);
+            fragments.add(fragment);
+        }
+
+        return fragments;
+    }
+
+    private InputSplit[] getSplits(Path path) throws IOException {
+        PxfInputFormat format = new PxfInputFormat();
+        PxfInputFormat.setInputPaths(jobConf, path);
+        return format.getSplits(jobConf, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
new file mode 100644
index 0000000..c1f2442
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
@@ -0,0 +1,146 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.ListIterator;
+
+/**
+ * Accessor for accessing a splittable HDFS data sources. HDFS will divide the
+ * file into splits based on an internal decision (by default, the block size is
+ * also the split size).
+ *
+ * Accessors that require such base functionality should extend this class.
+ */
+public abstract class HdfsSplittableDataAccessor extends Plugin implements
+        ReadAccessor {
+    protected Configuration conf = null;
+    protected RecordReader<Object, Object> reader = null;
+    protected InputFormat<?, ?> inputFormat = null;
+    protected ListIterator<InputSplit> iter = null;
+    protected JobConf jobConf = null;
+    protected Object key, data;
+
+    /**
+     * Constructs an HdfsSplittableDataAccessor
+     *
+     * @param input all input parameters coming from the client request
+     * @param inFormat the HDFS {@link InputFormat} the caller wants to use
+     */
+    public HdfsSplittableDataAccessor(InputData input,
+                                      InputFormat<?, ?> inFormat) {
+        super(input);
+        inputFormat = inFormat;
+
+        // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files
+        conf = new Configuration();
+
+        // 2. variable required for the splits iteration logic
+        jobConf = new JobConf(conf, HdfsSplittableDataAccessor.class);
+    }
+
+    /**
+     * Fetches the requested fragment (file split) for the current client
+     * request, and sets a record reader for the job.
+     *
+     * @return true if succeeded, false if no more splits to be read
+     */
+    @Override
+    public boolean openForRead() throws Exception {
+        LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>();
+        FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
+        requestSplits.add(fileSplit);
+
+        // Initialize record reader based on current split
+        iter = requestSplits.listIterator(0);
+        return getNextSplit();
+    }
+
+    /**
+     * Specialized accessors will override this method and implement their own
+     * recordReader. For example, a plain delimited text accessor may want to
+     * return a LineRecordReader.
+     *
+     * @param jobConf the hadoop jobconf to use for the selected InputFormat
+     * @param split the input split to be read by the accessor
+     * @return a recordreader to be used for reading the data records of the
+     *         split
+     * @throws IOException if recordreader could not be created
+     */
+    abstract protected Object getReader(JobConf jobConf, InputSplit split)
+            throws IOException;
+
+    /**
+     * Sets the current split and initializes a RecordReader who feeds from the
+     * split
+     *
+     * @return true if there is a split to read
+     * @throws IOException if record reader could not be created
+     */
+    @SuppressWarnings(value = "unchecked")
+    protected boolean getNextSplit() throws IOException  {
+        if (!iter.hasNext()) {
+            return false;
+        }
+
+        InputSplit currSplit = iter.next();
+        reader = (RecordReader<Object, Object>) getReader(jobConf, currSplit);
+        key = reader.createKey();
+        data = reader.createValue();
+        return true;
+    }
+
+    /**
+     * Fetches one record from the file. The record is returned as a Java
+     * object.
+     */
+    @Override
+    public OneRow readNextObject() throws IOException {
+        // if there is one more record in the current split
+        if (!reader.next(key, data)) {
+            // the current split is exhausted. try to move to the next split
+            if (getNextSplit()) {
+                // read the first record of the new split
+                if (!reader.next(key, data)) {
+                    // make sure we return nulls
+                    return null;
+                }
+            } else {
+                // make sure we return nulls
+                return null;
+            }
+        }
+
+        /*
+         * if neither condition was met, it means we already read all the
+         * records in all the splits, and in this call record variable was not
+         * set, so we return null and thus we are signaling end of records
+         * sequence
+         */
+        return new OneRow(key, data);
+    }
+
+    /**
+     * When user finished reading the file, it closes the RecordReader
+     */
+    @Override
+    public void closeForRead() throws Exception {
+        if (reader != null) {
+            reader.close();
+        }
+    }
+
+    @Override
+    public boolean isThreadSafe() {
+        return HdfsUtilities.isThreadSafe(inputData.getDataSource(),
+                inputData.getUserProperty("COMPRESSION_CODEC"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/LineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/LineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/LineBreakAccessor.java
new file mode 100644
index 0000000..2727a9b
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/LineBreakAccessor.java
@@ -0,0 +1,128 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.*;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * A PXF Accessor for reading delimited plain text records.
+ */
+public class LineBreakAccessor extends HdfsSplittableDataAccessor implements
+        WriteAccessor {
+    private DataOutputStream dos;
+    private FSDataOutputStream fsdos;
+    private Configuration conf;
+    private FileSystem fs;
+    private Path file;
+    private static Log Log = LogFactory.getLog(LineBreakAccessor.class);
+
+    /**
+     * Constructs a LineReaderAccessor.
+     *
+     * @param input all input parameters coming from the client request
+     */
+    public LineBreakAccessor(InputData input) {
+        super(input, new TextInputFormat());
+        ((TextInputFormat) inputFormat).configure(jobConf);
+    }
+
+    @Override
+    protected Object getReader(JobConf jobConf, InputSplit split)
+            throws IOException {
+        return new ChunkRecordReader(jobConf, (FileSplit) split);
+    }
+
+    /**
+     * Opens file for write.
+     */
+    @Override
+    public boolean openForWrite() throws Exception {
+
+        String fileName = inputData.getDataSource();
+        String compressCodec = inputData.getUserProperty("COMPRESSION_CODEC");
+        CompressionCodec codec = null;
+
+        conf = new Configuration();
+        fs = FileSystem.get(conf);
+
+        // get compression codec
+        if (compressCodec != null) {
+            codec = HdfsUtilities.getCodec(conf, compressCodec);
+            String extension = codec.getDefaultExtension();
+            fileName += extension;
+        }
+
+        file = new Path(fileName);
+
+        if (fs.exists(file)) {
+            throw new IOException("file " + file.toString()
+                    + " already exists, can't write data");
+        }
+        org.apache.hadoop.fs.Path parent = file.getParent();
+        if (!fs.exists(parent)) {
+            fs.mkdirs(parent);
+            Log.debug("Created new dir " + parent.toString());
+        }
+
+        // create output stream - do not allow overwriting existing file
+        createOutputStream(file, codec);
+
+        return true;
+    }
+
+    /*
+     * Creates output stream from given file. If compression codec is provided,
+     * wrap it around stream.
+     */
+    private void createOutputStream(Path file, CompressionCodec codec)
+            throws IOException {
+        fsdos = fs.create(file, false);
+        if (codec != null) {
+            dos = new DataOutputStream(codec.createOutputStream(fsdos));
+        } else {
+            dos = fsdos;
+        }
+
+    }
+
+    /**
+     * Writes row into stream.
+     */
+    @Override
+    public boolean writeNextObject(OneRow onerow) throws Exception {
+        dos.write((byte[]) onerow.getData());
+        return true;
+    }
+
+    /**
+     * Closes the output stream after done writing.
+     */
+    @Override
+    public void closeForWrite() throws Exception {
+        if ((dos != null) && (fsdos != null)) {
+            Log.debug("Closing writing stream for path " + file);
+            dos.flush();
+            /*
+             * From release 0.21.0 sync() is deprecated in favor of hflush(),
+             * which only guarantees that new readers will see all data written
+             * to that point, and hsync(), which makes a stronger guarantee that
+             * the operating system has flushed the data to disk (like POSIX
+             * fsync), although data may still be in the disk cache.
+             */
+            fsdos.hsync();
+            dos.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/QuotedLineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
new file mode 100644
index 0000000..249a90d
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
@@ -0,0 +1,52 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * A (atomic) PXF Accessor for reading \n delimited files with quoted
+ * field delimiter, line delimiter, and quotes. This accessor supports
+ * multi-line records, that are read from a single source (non-parallel).
+ */
+public class QuotedLineBreakAccessor extends HdfsAtomicDataAccessor {
+    private BufferedReader reader;
+
+    /**
+     * Constructs a QuotedLineBreakAccessor.
+     *
+     * @param input all input parameters coming from the client request
+     */
+    public QuotedLineBreakAccessor(InputData input) {
+        super(input);
+    }
+
+    @Override
+    public boolean openForRead() throws Exception {
+        if (!super.openForRead()) {
+            return false;
+        }
+        reader = new BufferedReader(new InputStreamReader(inp));
+        return true;
+    }
+
+    /**
+     * Fetches one record (maybe partial) from the  file. The record is returned as a Java object.
+     */
+    @Override
+    public OneRow readNextObject() throws IOException {
+        if (super.readNextObject() == null) /* check if working segment */ {
+            return null;
+        }
+
+        String next_line = reader.readLine();
+        if (next_line == null) /* EOF */ {
+            return null;
+        }
+
+        return new OneRow(null, next_line);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessor.java
new file mode 100644
index 0000000..9ef4cb9
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessor.java
@@ -0,0 +1,215 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+/**
+ * A PXF Accessor for reading and writing Sequence File records
+ */
+public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
+        WriteAccessor {
+
+    private Configuration conf;
+    private FileContext fc;
+    private Path file;
+    private CompressionCodec codec;
+    private CompressionType compressionType;
+    private SequenceFile.Writer writer;
+    private LongWritable defaultKey; // used when recordkey is not defined
+
+    private static Log Log = LogFactory.getLog(SequenceFileAccessor.class);;
+
+    /**
+     * Constructs a SequenceFileAccessor.
+     *
+     * @param input all input parameters coming from the client request
+     */
+    public SequenceFileAccessor(InputData input) {
+        super(input, new SequenceFileInputFormat<Writable, Writable>());
+    }
+
+    /**
+     * Overrides virtual method to create specialized record reader
+     */
+    @Override
+    protected Object getReader(JobConf jobConf, InputSplit split)
+            throws IOException {
+        return new SequenceFileRecordReader<Object, Object>(jobConf, (FileSplit) split);
+    }
+
+    @Override
+    public boolean openForWrite() throws Exception {
+        FileSystem fs;
+        Path parent;
+        String fileName = inputData.getDataSource();
+        conf = new Configuration();
+
+        getCompressionCodec(inputData);
+        fileName = updateFileExtension(fileName, codec);
+
+        // construct the output stream
+        file = new Path(fileName);
+        fs = file.getFileSystem(conf);
+        fc = FileContext.getFileContext();
+        defaultKey = new LongWritable(inputData.getSegmentId());
+
+        if (fs.exists(file)) {
+            throw new IOException("file " + file
+                    + " already exists, can't write data");
+        }
+        parent = file.getParent();
+        if (!fs.exists(parent)) {
+            fs.mkdirs(parent);
+            Log.debug("Created new dir " + parent);
+        }
+
+        writer = null;
+        return true;
+    }
+
+    /**
+     * Compression: based on compression codec and compression type (default
+     * value RECORD). If there is no codec, compression type is ignored, and
+     * NONE is used.
+     *
+     * @param inputData - container where compression codec and type are held
+     */
+    private void getCompressionCodec(InputData inputData) {
+
+        String userCompressCodec = inputData.getUserProperty("COMPRESSION_CODEC");
+        String userCompressType = inputData.getUserProperty("COMPRESSION_TYPE");
+        String parsedCompressType = parseCompressionType(userCompressType);
+
+        compressionType = SequenceFile.CompressionType.NONE;
+        codec = null;
+        if (userCompressCodec != null) {
+            codec = HdfsUtilities.getCodec(conf, userCompressCodec);
+
+            try {
+                compressionType = CompressionType.valueOf(parsedCompressType);
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException(
+                        "Illegal value for compression type " + "'"
+                                + parsedCompressType + "'");
+            }
+            if (compressionType == null) {
+                throw new IllegalArgumentException(
+                        "Compression type must be defined");
+            }
+
+            Log.debug("Compression ON: " + "compression codec: "
+                    + userCompressCodec + ", compression type: "
+                    + compressionType);
+        }
+    }
+
+    /*
+     * Parses compression type for sequence file. If null, default to RECORD.
+     * Allowed values: RECORD, BLOCK.
+     */
+    private String parseCompressionType(String compressType) {
+        final String COMPRESSION_TYPE_RECORD = "RECORD";
+        final String COMPRESSION_TYPE_BLOCK = "BLOCK";
+        final String COMPRESSION_TYPE_NONE = "NONE";
+
+        if (compressType == null) {
+            return COMPRESSION_TYPE_RECORD;
+        }
+
+        if (compressType.equalsIgnoreCase(COMPRESSION_TYPE_NONE)) {
+            throw new IllegalArgumentException(
+                    "Illegal compression type 'NONE'. "
+                            + "For disabling compression remove COMPRESSION_CODEC parameter.");
+        }
+
+        if (!compressType.equalsIgnoreCase(COMPRESSION_TYPE_RECORD)
+                && !compressType.equalsIgnoreCase(COMPRESSION_TYPE_BLOCK)) {
+            throw new IllegalArgumentException("Illegal compression type '"
+                    + compressType + "'");
+        }
+
+        return compressType.toUpperCase();
+    }
+
+    /*
+     * Returns fileName with the codec's file extension appended
+     */
+    private String updateFileExtension(String fileName, CompressionCodec codec) {
+
+        if (codec != null) {
+            fileName += codec.getDefaultExtension();
+        }
+        Log.debug("File name for write: " + fileName);
+        return fileName;
+    }
+
+    @Override
+    public boolean writeNextObject(OneRow onerow) throws IOException {
+        Writable value = (Writable) onerow.getData();
+        Writable key = (Writable) onerow.getKey();
+
+        // init writer on first approach here, based on onerow.getData type
+        // TODO: verify data is serializable.
+        if (writer == null) {
+            Class<? extends Writable> valueClass = value.getClass();
+            Class<? extends Writable> keyClass = (key == null) ? LongWritable.class
+                    : key.getClass();
+            // create writer - do not allow overwriting existing file
+            writer = SequenceFile.createWriter(fc, conf, file, keyClass,
+                    valueClass, compressionType, codec,
+                    new SequenceFile.Metadata(), EnumSet.of(CreateFlag.CREATE));
+        }
+
+        try {
+            writer.append((key == null) ? defaultKey : key, value);
+        } catch (IOException e) {
+            Log.error("Failed to write data to file: " + e.getMessage());
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public void closeForWrite() throws Exception {
+        if (writer != null) {
+            writer.sync();
+            /*
+             * From release 0.21.0 sync() is deprecated in favor of hflush(),
+             * which only guarantees that new readers will see all data written
+             * to that point, and hsync(), which makes a stronger guarantee that
+             * the operating system has flushed the data to disk (like POSIX
+             * fsync), although data may still be in the disk cache.
+             */
+            writer.hsync();
+            writer.close();
+        }
+    }
+
+    public CompressionType getCompressionType() {
+        return compressionType;
+    }
+
+    public CompressionCodec getCodec() {
+        return codec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
new file mode 100644
index 0000000..efce79f
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
@@ -0,0 +1,74 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.WriteResolver;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.hawq.pxf.api.io.DataType.VARCHAR;
+
+/**
+ * StringPassResolver handles "deserialization" and serialization of
+ * String records. StringPassResolver implements IReadResolver and
+ * IWriteResolver interfaces. Returns strings as-is.
+ */
+public class StringPassResolver extends Plugin implements ReadResolver, WriteResolver {
+    // for write
+    private OneRow oneRow;
+
+    /**
+     * Constructs a StringPassResolver.
+     *
+     * @param inputData input all input parameters coming from the client request
+     */
+    public StringPassResolver(InputData inputData) {
+        super(inputData);
+        oneRow = new OneRow();
+        this.inputData = inputData;
+    }
+
+    /**
+     * Returns a list of the fields of one record.
+     * Each record field is represented by a {@link OneField} item.
+     * OneField item contains two fields: an integer representing the field type and a Java
+     * Object representing the field value.
+     */
+    @Override
+    public List<OneField> getFields(OneRow onerow) {
+        /*
+         * This call forces a whole text line into a single varchar field and replaces
+		 * the proper field separation code can be found in previous revisions. The reasons
+		 * for doing so as this point are:
+		 * 1. performance
+		 * 2. desire to not replicate text parsing logic from the backend into java
+		 */
+        List<OneField> record = new LinkedList<OneField>();
+		Object data = onerow.getData();
+		if (data instanceof ChunkWritable) {
+			record.add(new OneField(DataType.BYTEA.getOID(), ((ChunkWritable)data).box));
+		}
+		else {
+			record.add(new OneField(VARCHAR.getOID(), data));
+		}
+        return record;
+    }
+
+    /**
+     * Creates a OneRow object from the singleton list.
+     */
+    @Override
+    public OneRow setFields(List<OneField> record) throws Exception {
+        if (((byte[]) record.get(0).val).length == 0) {
+            return null;
+        }
+
+        oneRow.setData(record.get(0).val);
+        return oneRow;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/WritableResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/WritableResolver.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/WritableResolver.java
new file mode 100644
index 0000000..fa8da82
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/WritableResolver.java
@@ -0,0 +1,220 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.*;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.RecordkeyAdapter;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.DataSchemaException;
+import org.apache.hawq.pxf.service.utilities.Utilities;
+
+import static org.apache.hawq.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.hawq.pxf.api.io.DataType.*;
+
+/**
+ * WritableResolver handles serialization and deserialization of records
+ * that were serialized using Hadoop's Writable serialization framework.
+ *
+ * A field named 'recordkey' is treated as a key of the given row, and not as
+ * part of the data schema. See {@link RecordkeyAdapter}.
+ */
+public class WritableResolver extends Plugin implements ReadResolver, WriteResolver {
+    private static final int RECORDKEY_UNDEFINED = -1;
+    private static final Log LOG = LogFactory.getLog(WritableResolver.class);
+    private RecordkeyAdapter recordkeyAdapter = new RecordkeyAdapter();
+    private int recordkeyIndex;
+    // reflection fields
+    private Object userObject = null;
+    private Field[] fields = null;
+
+
+    /**
+     * Constructs a WritableResolver.
+     *
+     * @param input all input parameters coming from the client
+     * @throws Exception if schema file is missing, cannot be found in
+     *                   classpath or fails to instantiate
+     */
+    public WritableResolver(InputData input) throws Exception {
+        super(input);
+
+        String schemaName = inputData.getUserProperty("DATA-SCHEMA");
+
+        /** Testing that the schema name was supplied by the user - schema is an optional property. */
+        if (schemaName == null) {
+            throw new DataSchemaException(SCHEMA_NOT_INDICATED, this.getClass().getName());
+        }
+
+        /** Testing that the schema resource exists. */
+        if (!isSchemaOnClasspath(schemaName)) {
+            throw new DataSchemaException(SCHEMA_NOT_ON_CLASSPATH, schemaName);
+        }
+
+        userObject = Utilities.createAnyInstance(schemaName);
+        fields = userObject.getClass().getDeclaredFields();
+        recordkeyIndex = (inputData.getRecordkeyColumn() == null)
+                ? RECORDKEY_UNDEFINED
+                        : inputData.getRecordkeyColumn().columnIndex();
+
+        // fields details:
+        if (LOG.isDebugEnabled()) {
+            for (int i = 0; i < fields.length; i++) {
+                Field field = fields[i];
+                String javaType = field.getType().getName();
+                boolean isPrivate = Modifier.isPrivate(field.getModifiers());
+
+                LOG.debug("Field #" + i + ", name: " + field.getName() +
+                        " type: " + javaType + ", " +
+                        (isArray(javaType) ? "Array" : "Primitive") + ", " +
+                        (isPrivate ? "Private" : "accessible") + " field");
+            }
+        }
+    }
+
+    private boolean isArray(String javaType) {
+        return (javaType.startsWith("[") && !"[B".equals(javaType));
+    }
+
+    @Override
+    public List<OneField> getFields(OneRow onerow) throws Exception {
+        userObject = onerow.getData();
+        List<OneField> record = new LinkedList<OneField>();
+
+        int currentIdx = 0;
+        for (Field field : fields) {
+            if (currentIdx == recordkeyIndex) {
+                currentIdx += recordkeyAdapter.appendRecordkeyField(record, inputData, onerow);
+            }
+
+            if (Modifier.isPrivate(field.getModifiers())) {
+                continue;
+            }
+
+            currentIdx += populateRecord(record, field);
+        }
+
+        return record;
+    }
+
+    int setArrayField(List<OneField> record, int dataType, Field reflectedField) throws IllegalAccessException {
+        Object array = reflectedField.get(userObject);
+        int length = Array.getLength(array);
+        for (int j = 0; j < length; j++) {
+            record.add(new OneField(dataType, Array.get(array, j)));
+        }
+        return length;
+    }
+
+    /*
+     * Given a java Object type, convert it to the corresponding output field
+     * type.
+     */
+    private DataType convertJavaToGPDBType(String type) {
+        if ("boolean".equals(type) || "[Z".equals(type)) {
+            return BOOLEAN;
+        }
+        if ("int".equals(type) || "[I".equals(type)) {
+            return INTEGER;
+        }
+        if ("double".equals(type) || "[D".equals(type)) {
+            return FLOAT8;
+        }
+        if ("java.lang.String".equals(type) || "[Ljava.lang.String;".equals(type)) {
+            return TEXT;
+        }
+        if ("float".equals(type) || "[F".equals(type)) {
+            return REAL;
+        }
+        if ("long".equals(type) || "[J".equals(type)) {
+            return BIGINT;
+        }
+        if ("[B".equals(type)) {
+            return BYTEA;
+        }
+        if ("short".equals(type) || "[S".equals(type)) {
+            return SMALLINT;
+        }
+        throw new UnsupportedTypeException("Type " + type + " is not supported by GPDBWritable");
+    }
+
+    int populateRecord(List<OneField> record, Field field) throws BadRecordException {
+        String javaType = field.getType().getName();
+        try {
+            DataType dataType = convertJavaToGPDBType(javaType);
+            if (isArray(javaType)) {
+                return setArrayField(record, dataType.getOID(), field);
+            }
+            record.add(new OneField(dataType.getOID(), field.get(userObject)));
+            return 1;
+        } catch (IllegalAccessException ex) {
+            throw new BadRecordException(ex);
+        }
+    }
+
+    /**
+     * Sets customWritable fields and creates a OneRow object.
+     */
+    @Override
+    public OneRow setFields(List<OneField> record) throws Exception {
+        Writable key = null;
+
+        int colIdx = 0;
+        for (Field field : fields) {
+            /*
+             * extract recordkey based on the column descriptor type
+             * and add to OneRow.key
+             */
+            if (colIdx == recordkeyIndex) {
+                key = recordkeyAdapter.convertKeyValue(record.get(colIdx).val);
+                colIdx++;
+            }
+
+            if (Modifier.isPrivate(field.getModifiers())) {
+                continue;
+            }
+
+            String javaType = field.getType().getName();
+            convertJavaToGPDBType(javaType);
+            if (isArray(javaType)) {
+                Object value = field.get(userObject);
+                int length = Array.getLength(value);
+                for (int j = 0; j < length; j++, colIdx++) {
+                    Array.set(value, j, record.get(colIdx).val);
+                }
+            } else {
+                field.set(userObject, record.get(colIdx).val);
+                colIdx++;
+            }
+        }
+
+        return new OneRow(key, userObject);
+    }
+
+    /*
+     * Tests for the case schema resource is a file like avro_schema.avsc
+     * or for the case schema resource is a Java class. in which case we try to reflect the class name.
+     */
+    private boolean isSchemaOnClasspath(String resource) {
+        if (this.getClass().getClassLoader().getResource("/" + resource) != null) {
+            return true;
+        }
+
+        try {
+            Class.forName(resource);
+            return true;
+        } catch (ClassNotFoundException e) {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/DataSchemaException.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/DataSchemaException.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/DataSchemaException.java
new file mode 100644
index 0000000..6c7ece0
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/DataSchemaException.java
@@ -0,0 +1,43 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+/**
+ * Thrown when there is a data schema problem detected by any plugin that
+ * requires a schema.
+ * {@link DataSchemaException.MessageFmt#SCHEMA_NOT_ON_CLASSPATH} when the specified schema is missing from the CLASSPATH.
+ * {@link DataSchemaException.MessageFmt#SCHEMA_NOT_INDICATED} when a schema was required but was not specified in the pxf uri.
+ */
+public class DataSchemaException extends RuntimeException {
+    public static enum MessageFmt {
+		SCHEMA_NOT_INDICATED("%s requires a data schema to be specified in the "+
+							 "pxf uri, but none was found. Please supply it" +
+							 "using the DATA-SCHEMA option "),
+		SCHEMA_NOT_ON_CLASSPATH("schema resource \"%s\" is not located on the classpath");
+		
+        String format;
+
+        MessageFmt(String format) {
+            this.format = format;
+        }
+
+        public String getFormat() {
+            return format;
+        }
+    }
+
+    private MessageFmt msgFormat;
+
+    /**
+     * Constructs a DataSchemaException.
+     *
+     * @param msgFormat the message format
+     * @param msgArgs the message arguments
+     */
+    public DataSchemaException(MessageFmt msgFormat, String... msgArgs) {
+        super(String.format(msgFormat.getFormat(), (Object[]) msgArgs));
+        this.msgFormat = msgFormat;
+    }
+
+    public MessageFmt getMsgFormat() {
+        return msgFormat;
+    }
+}
\ No newline at end of file