You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2015/11/03 01:36:14 UTC

[11/15] incubator-hawq git commit: HAWQ-45. PXF package namespace refactor

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
deleted file mode 100644
index 0f8f908..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.Analyzer;
-import com.pivotal.pxf.api.AnalyzerStats;
-import com.pivotal.pxf.api.ReadAccessor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.service.ReadBridge;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import com.pivotal.pxf.plugins.hdfs.utilities.PxfInputFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-/**
- * Analyzer class for HDFS data resources
- *
- * Given an HDFS data source (a file, directory, or wild card pattern)
- * return statistics about it (number of blocks, number of tuples, etc.)
- */
-public class HdfsAnalyzer extends Analyzer {
-    private JobConf jobConf;
-    private FileSystem fs;
-    private Log Log;
-
-    /**
-     * Constructs an HdfsAnalyzer object.
-     *
-     * @param inputData all input parameters coming from the client
-     * @throws IOException if HDFS file system cannot be retrieved
-     */
-    public HdfsAnalyzer(InputData inputData) throws IOException {
-        super(inputData);
-        Log = LogFactory.getLog(HdfsAnalyzer.class);
-
-        jobConf = new JobConf(new Configuration(), HdfsAnalyzer.class);
-        fs = FileSystem.get(jobConf);
-    }
-
-    /**
-     * Collects a number of basic statistics based on an estimate. Statistics
-     * are: number of records, number of hdfs blocks and hdfs block size.
-     *
-     * @param datapath path is a data source URI that can appear as a file
-     *        name, a directory name or a wildcard pattern
-     * @return statistics in JSON format
-     * @throws Exception if path is wrong, its metadata cannot be retrieved
-     *                    from file system, or if scanning the first block
-     *                    using the accessor failed
-     */
-    @Override
-    public AnalyzerStats getEstimatedStats(String datapath) throws Exception {
-        long blockSize = 0;
-        long numberOfBlocks;
-        Path path = new Path(HdfsUtilities.absoluteDataPath(datapath));
-
-        ArrayList<InputSplit> splits = getSplits(path);
-
-        for (InputSplit split : splits) {
-            FileSplit fsp = (FileSplit) split;
-            Path filePath = fsp.getPath();
-            FileStatus fileStatus = fs.getFileStatus(filePath);
-            if (fileStatus.isFile()) {
-                blockSize = fileStatus.getBlockSize();
-                break;
-            }
-        }
-
-        // if no file is in path (only dirs), get default block size
-        if (blockSize == 0) {
-            blockSize = fs.getDefaultBlockSize(path);
-        }
-        numberOfBlocks = splits.size();
-
-
-        long numberOfTuplesInBlock = getNumberOfTuplesInBlock(splits);
-        AnalyzerStats stats = new AnalyzerStats(blockSize, numberOfBlocks, numberOfTuplesInBlock * numberOfBlocks);
-
-        //print files size to log when in debug level
-        Log.debug(AnalyzerStats.dataToString(stats, path.toString()));
-
-        return stats;
-    }
-
-    /**
-     * Calculates the number of tuples in a split (block).
-     * Reads one block from HDFS. Exception during reading will
-     * filter upwards and handled in AnalyzerResource
-     */
-    private long getNumberOfTuplesInBlock(ArrayList<InputSplit> splits) throws Exception {
-        long tuples = -1; /* default  - if we are not able to read data */
-        ReadAccessor accessor;
-
-        if (splits.isEmpty()) {
-            return 0;
-        }
-
-        /*
-         * metadata information includes: file split's
-         * start, length and hosts (locations).
-         */
-        FileSplit firstSplit = (FileSplit) splits.get(0);
-        byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(firstSplit);
-        inputData.setFragmentMetadata(fragmentMetadata);
-        inputData.setDataSource(firstSplit.getPath().toUri().getPath());
-        accessor = ReadBridge.getFileAccessor(inputData);
-
-        if (accessor.openForRead()) {
-            tuples = 0;
-            while (accessor.readNextObject() != null) {
-                tuples++;
-            }
-
-            accessor.closeForRead();
-        }
-
-        return tuples;
-    }
-
-    private ArrayList<InputSplit> getSplits(Path path) throws IOException {
-        PxfInputFormat fformat = new PxfInputFormat();
-        PxfInputFormat.setInputPaths(jobConf, path);
-        InputSplit[] splits = fformat.getSplits(jobConf, 1);
-        ArrayList<InputSplit> result = new ArrayList<InputSplit>();
-
-        // remove empty splits
-        if (splits != null) {
-	        for (InputSplit split : splits) {
-	        	if (split.getLength() > 0) {
-	        		result.add(split);
-	        	}
-	        }
-        }
-
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
deleted file mode 100644
index 4a43c5f..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.ReadAccessor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-
-/**
- * Base class for enforcing the complete access of a file in one accessor.
- * Since we are not accessing the file using the splittable API, but instead are
- * using the "simple" stream API, it means that we cannot fetch different parts
- * (splits) of the file in different segments. Instead each file access brings
- * the complete file. And, if several segments would access the same file, then
- * each one will return the whole file and we will observe in the query result,
- * each record appearing number_of_segments times. To avoid this we will only
- * have one segment (segment 0) working for this case - enforced with
- * isWorkingSegment() method. Naturally this is the less recommended working
- * mode since we are not making use of segment parallelism. HDFS accessors for
- * a specific file type should inherit from this class only if the file they are
- * reading does not support splitting: a protocol-buffer file, regular file, ...
- */
-public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAccessor {
-    private Configuration conf = null;
-    protected InputStream inp = null;
-    private FileSplit fileSplit = null;
-
-    /**
-     * Constructs a HdfsAtomicDataAccessor object.
-     *
-     * @param input all input parameters coming from the client
-     */
-    public HdfsAtomicDataAccessor(InputData input) {
-        // 0. Hold the configuration data
-        super(input);
-
-        // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files
-        conf = new Configuration();
-
-        fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
-    }
-
-    /**
-     * Opens the file using the non-splittable API for HADOOP HDFS file access
-     * This means that instead of using a FileInputFormat for access, we use a
-     * Java stream.
-     *
-     * @return true for successful file open, false otherwise
-     */
-    @Override
-    public boolean openForRead() throws Exception {
-        if (!isWorkingSegment()) {
-            return false;
-        }
-
-        // input data stream
-        FileSystem fs = FileSystem.get(URI.create(inputData.getDataSource()), conf); // FileSystem.get actually returns an FSDataInputStream
-        inp = fs.open(new Path(inputData.getDataSource()));
-
-        return (inp != null);
-    }
-
-    /**
-     * Fetches one record from the file.
-     *
-     * @return a {@link OneRow} record as a Java object. Returns null if none.
-     */
-    @Override
-    public OneRow readNextObject() throws IOException {
-        if (!isWorkingSegment()) {
-            return null;
-        }
-
-        return new OneRow(null, new Object());
-    }
-
-    /**
-     * Closes the access stream when finished reading the file
-     */
-    @Override
-    public void closeForRead() throws Exception {
-        if (!isWorkingSegment()) {
-            return;
-        }
-
-        if (inp != null) {
-            inp.close();
-        }
-    }
-
-    /*
-     * Making sure that only the segment that got assigned the first data
-     * fragment will read the (whole) file.
-     */
-    private boolean isWorkingSegment() {
-        return (fileSplit.getStart() == 0);
-    }
-
-    @Override
-    public boolean isThreadSafe() {
-        return HdfsUtilities.isThreadSafe(inputData.getDataSource(),
-        								  inputData.getUserProperty("COMPRESSION_CODEC"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
deleted file mode 100644
index 1bf5aab..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.Fragment;
-import com.pivotal.pxf.api.Fragmenter;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import com.pivotal.pxf.plugins.hdfs.utilities.PxfInputFormat;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Fragmenter class for HDFS data resources.
- *
- * Given an HDFS data source (a file, directory, or wild card pattern) divide
- * the data into fragments and return a list of them along with a list of
- * host:port locations for each.
- */
-public class HdfsDataFragmenter extends Fragmenter {
-    private JobConf jobConf;
-
-    /**
-     * Constructs an HdfsDataFragmenter object.
-     *
-     * @param md all input parameters coming from the client
-     */
-    public HdfsDataFragmenter(InputData md) {
-        super(md);
-
-        jobConf = new JobConf(new Configuration(), HdfsDataFragmenter.class);
-    }
-
-    /**
-     * Gets the fragments for a data source URI that can appear as a file name,
-     * a directory name or a wildcard. Returns the data fragments in JSON
-     * format.
-     */
-    @Override
-    public List<Fragment> getFragments() throws Exception {
-        String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
-        InputSplit[] splits = getSplits(new Path(absoluteDataPath));
-
-        for (InputSplit split : splits != null ? splits : new InputSplit[] {}) {
-            FileSplit fsp = (FileSplit) split;
-
-            /*
-             * HD-2547: If the file is empty, an empty split is returned: no
-             * locations and no length.
-             */
-            if (fsp.getLength() <= 0) {
-                continue;
-            }
-
-            String filepath = fsp.getPath().toUri().getPath();
-            String[] hosts = fsp.getLocations();
-
-            /*
-             * metadata information includes: file split's start, length and
-             * hosts (locations).
-             */
-            byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(fsp);
-            Fragment fragment = new Fragment(filepath, hosts, fragmentMetadata);
-            fragments.add(fragment);
-        }
-
-        return fragments;
-    }
-
-    private InputSplit[] getSplits(Path path) throws IOException {
-        PxfInputFormat format = new PxfInputFormat();
-        PxfInputFormat.setInputPaths(jobConf, path);
-        return format.getSplits(jobConf, 1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
deleted file mode 100644
index 744342d..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.ReadAccessor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.*;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.ListIterator;
-
-/**
- * Accessor for accessing a splittable HDFS data sources. HDFS will divide the
- * file into splits based on an internal decision (by default, the block size is
- * also the split size).
- *
- * Accessors that require such base functionality should extend this class.
- */
-public abstract class HdfsSplittableDataAccessor extends Plugin implements
-        ReadAccessor {
-    protected Configuration conf = null;
-    protected RecordReader<Object, Object> reader = null;
-    protected InputFormat<?, ?> inputFormat = null;
-    protected ListIterator<InputSplit> iter = null;
-    protected JobConf jobConf = null;
-    protected Object key, data;
-
-    /**
-     * Constructs an HdfsSplittableDataAccessor
-     *
-     * @param input all input parameters coming from the client request
-     * @param inFormat the HDFS {@link InputFormat} the caller wants to use
-     */
-    public HdfsSplittableDataAccessor(InputData input,
-                                      InputFormat<?, ?> inFormat) {
-        super(input);
-        inputFormat = inFormat;
-
-        // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files
-        conf = new Configuration();
-
-        // 2. variable required for the splits iteration logic
-        jobConf = new JobConf(conf, HdfsSplittableDataAccessor.class);
-    }
-
-    /**
-     * Fetches the requested fragment (file split) for the current client
-     * request, and sets a record reader for the job.
-     *
-     * @return true if succeeded, false if no more splits to be read
-     */
-    @Override
-    public boolean openForRead() throws Exception {
-        LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>();
-        FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
-        requestSplits.add(fileSplit);
-
-        // Initialize record reader based on current split
-        iter = requestSplits.listIterator(0);
-        return getNextSplit();
-    }
-
-    /**
-     * Specialized accessors will override this method and implement their own
-     * recordReader. For example, a plain delimited text accessor may want to
-     * return a LineRecordReader.
-     *
-     * @param jobConf the hadoop jobconf to use for the selected InputFormat
-     * @param split the input split to be read by the accessor
-     * @return a recordreader to be used for reading the data records of the
-     *         split
-     * @throws IOException if recordreader could not be created
-     */
-    abstract protected Object getReader(JobConf jobConf, InputSplit split)
-            throws IOException;
-
-    /**
-     * Sets the current split and initializes a RecordReader who feeds from the
-     * split
-     *
-     * @return true if there is a split to read
-     * @throws IOException if record reader could not be created
-     */
-    @SuppressWarnings(value = "unchecked")
-    protected boolean getNextSplit() throws IOException  {
-        if (!iter.hasNext()) {
-            return false;
-        }
-
-        InputSplit currSplit = iter.next();
-        reader = (RecordReader<Object, Object>) getReader(jobConf, currSplit);
-        key = reader.createKey();
-        data = reader.createValue();
-        return true;
-    }
-
-    /**
-     * Fetches one record from the file. The record is returned as a Java
-     * object.
-     */
-    @Override
-    public OneRow readNextObject() throws IOException {
-        // if there is one more record in the current split
-        if (!reader.next(key, data)) {
-            // the current split is exhausted. try to move to the next split
-            if (getNextSplit()) {
-                // read the first record of the new split
-                if (!reader.next(key, data)) {
-                    // make sure we return nulls
-                    return null;
-                }
-            } else {
-                // make sure we return nulls
-                return null;
-            }
-        }
-
-        /*
-         * if neither condition was met, it means we already read all the
-         * records in all the splits, and in this call record variable was not
-         * set, so we return null and thus we are signaling end of records
-         * sequence
-         */
-        return new OneRow(key, data);
-    }
-
-    /**
-     * When user finished reading the file, it closes the RecordReader
-     */
-    @Override
-    public void closeForRead() throws Exception {
-        if (reader != null) {
-            reader.close();
-        }
-    }
-
-    @Override
-    public boolean isThreadSafe() {
-        return HdfsUtilities.isThreadSafe(inputData.getDataSource(),
-                inputData.getUserProperty("COMPRESSION_CODEC"));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
deleted file mode 100644
index 32002a1..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.WriteAccessor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.*;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * A PXF Accessor for reading delimited plain text records.
- */
-public class LineBreakAccessor extends HdfsSplittableDataAccessor implements
-        WriteAccessor {
-    private DataOutputStream dos;
-    private FSDataOutputStream fsdos;
-    private Configuration conf;
-    private FileSystem fs;
-    private Path file;
-    private static Log Log = LogFactory.getLog(LineBreakAccessor.class);
-
-    /**
-     * Constructs a LineReaderAccessor.
-     *
-     * @param input all input parameters coming from the client request
-     */
-    public LineBreakAccessor(InputData input) {
-        super(input, new TextInputFormat());
-        ((TextInputFormat) inputFormat).configure(jobConf);
-    }
-
-    @Override
-    protected Object getReader(JobConf jobConf, InputSplit split)
-            throws IOException {
-        return new ChunkRecordReader(jobConf, (FileSplit) split);
-    }
-
-    /**
-     * Opens file for write.
-     */
-    @Override
-    public boolean openForWrite() throws Exception {
-
-        String fileName = inputData.getDataSource();
-        String compressCodec = inputData.getUserProperty("COMPRESSION_CODEC");
-        CompressionCodec codec = null;
-
-        conf = new Configuration();
-        fs = FileSystem.get(conf);
-
-        // get compression codec
-        if (compressCodec != null) {
-            codec = HdfsUtilities.getCodec(conf, compressCodec);
-            String extension = codec.getDefaultExtension();
-            fileName += extension;
-        }
-
-        file = new Path(fileName);
-
-        if (fs.exists(file)) {
-            throw new IOException("file " + file.toString()
-                    + " already exists, can't write data");
-        }
-        org.apache.hadoop.fs.Path parent = file.getParent();
-        if (!fs.exists(parent)) {
-            fs.mkdirs(parent);
-            Log.debug("Created new dir " + parent.toString());
-        }
-
-        // create output stream - do not allow overwriting existing file
-        createOutputStream(file, codec);
-
-        return true;
-    }
-
-    /*
-     * Creates output stream from given file. If compression codec is provided,
-     * wrap it around stream.
-     */
-    private void createOutputStream(Path file, CompressionCodec codec)
-            throws IOException {
-        fsdos = fs.create(file, false);
-        if (codec != null) {
-            dos = new DataOutputStream(codec.createOutputStream(fsdos));
-        } else {
-            dos = fsdos;
-        }
-
-    }
-
-    /**
-     * Writes row into stream.
-     */
-    @Override
-    public boolean writeNextObject(OneRow onerow) throws Exception {
-        dos.write((byte[]) onerow.getData());
-        return true;
-    }
-
-    /**
-     * Closes the output stream after done writing.
-     */
-    @Override
-    public void closeForWrite() throws Exception {
-        if ((dos != null) && (fsdos != null)) {
-            Log.debug("Closing writing stream for path " + file);
-            dos.flush();
-            /*
-             * From release 0.21.0 sync() is deprecated in favor of hflush(),
-             * which only guarantees that new readers will see all data written
-             * to that point, and hsync(), which makes a stronger guarantee that
-             * the operating system has flushed the data to disk (like POSIX
-             * fsync), although data may still be in the disk cache.
-             */
-            fsdos.hsync();
-            dos.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
deleted file mode 100644
index 13880b8..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.utilities.InputData;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-/**
- * A (atomic) PXF Accessor for reading \n delimited files with quoted
- * field delimiter, line delimiter, and quotes. This accessor supports
- * multi-line records, that are read from a single source (non-parallel).
- */
-public class QuotedLineBreakAccessor extends HdfsAtomicDataAccessor {
-    private BufferedReader reader;
-
-    /**
-     * Constructs a QuotedLineBreakAccessor.
-     *
-     * @param input all input parameters coming from the client request
-     */
-    public QuotedLineBreakAccessor(InputData input) {
-        super(input);
-    }
-
-    @Override
-    public boolean openForRead() throws Exception {
-        if (!super.openForRead()) {
-            return false;
-        }
-        reader = new BufferedReader(new InputStreamReader(inp));
-        return true;
-    }
-
-    /**
-     * Fetches one record (maybe partial) from the  file. The record is returned as a Java object.
-     */
-    @Override
-    public OneRow readNextObject() throws IOException {
-        if (super.readNextObject() == null) /* check if working segment */ {
-            return null;
-        }
-
-        String next_line = reader.readLine();
-        if (next_line == null) /* EOF */ {
-            return null;
-        }
-
-        return new OneRow(null, next_line);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
deleted file mode 100644
index 5f3f3dd..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
+++ /dev/null
@@ -1,215 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.WriteAccessor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.*;
-
-import java.io.IOException;
-import java.util.EnumSet;
-
-/**
- * A PXF Accessor for reading and writing Sequence File records
- */
-public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
-        WriteAccessor {
-
-    private Configuration conf;
-    private FileContext fc;
-    private Path file;
-    private CompressionCodec codec;
-    private CompressionType compressionType;
-    private SequenceFile.Writer writer;
-    private LongWritable defaultKey; // used when recordkey is not defined
-
-    private static Log Log = LogFactory.getLog(SequenceFileAccessor.class);;
-
-    /**
-     * Constructs a SequenceFileAccessor.
-     *
-     * @param input all input parameters coming from the client request
-     */
-    public SequenceFileAccessor(InputData input) {
-        super(input, new SequenceFileInputFormat<Writable, Writable>());
-    }
-
-    /**
-     * Overrides virtual method to create specialized record reader
-     */
-    @Override
-    protected Object getReader(JobConf jobConf, InputSplit split)
-            throws IOException {
-        return new SequenceFileRecordReader<Object, Object>(jobConf, (FileSplit) split);
-    }
-
-    @Override
-    public boolean openForWrite() throws Exception {
-        FileSystem fs;
-        Path parent;
-        String fileName = inputData.getDataSource();
-        conf = new Configuration();
-
-        getCompressionCodec(inputData);
-        fileName = updateFileExtension(fileName, codec);
-
-        // construct the output stream
-        file = new Path(fileName);
-        fs = file.getFileSystem(conf);
-        fc = FileContext.getFileContext();
-        defaultKey = new LongWritable(inputData.getSegmentId());
-
-        if (fs.exists(file)) {
-            throw new IOException("file " + file
-                    + " already exists, can't write data");
-        }
-        parent = file.getParent();
-        if (!fs.exists(parent)) {
-            fs.mkdirs(parent);
-            Log.debug("Created new dir " + parent);
-        }
-
-        writer = null;
-        return true;
-    }
-
-    /**
-     * Compression: based on compression codec and compression type (default
-     * value RECORD). If there is no codec, compression type is ignored, and
-     * NONE is used.
-     *
-     * @param inputData - container where compression codec and type are held
-     */
-    private void getCompressionCodec(InputData inputData) {
-
-        String userCompressCodec = inputData.getUserProperty("COMPRESSION_CODEC");
-        String userCompressType = inputData.getUserProperty("COMPRESSION_TYPE");
-        String parsedCompressType = parseCompressionType(userCompressType);
-
-        compressionType = SequenceFile.CompressionType.NONE;
-        codec = null;
-        if (userCompressCodec != null) {
-            codec = HdfsUtilities.getCodec(conf, userCompressCodec);
-
-            try {
-                compressionType = CompressionType.valueOf(parsedCompressType);
-            } catch (IllegalArgumentException e) {
-                throw new IllegalArgumentException(
-                        "Illegal value for compression type " + "'"
-                                + parsedCompressType + "'");
-            }
-            if (compressionType == null) {
-                throw new IllegalArgumentException(
-                        "Compression type must be defined");
-            }
-
-            Log.debug("Compression ON: " + "compression codec: "
-                    + userCompressCodec + ", compression type: "
-                    + compressionType);
-        }
-    }
-
-    /*
-     * Parses compression type for sequence file. If null, default to RECORD.
-     * Allowed values: RECORD, BLOCK.
-     */
-    private String parseCompressionType(String compressType) {
-        final String COMPRESSION_TYPE_RECORD = "RECORD";
-        final String COMPRESSION_TYPE_BLOCK = "BLOCK";
-        final String COMPRESSION_TYPE_NONE = "NONE";
-
-        if (compressType == null) {
-            return COMPRESSION_TYPE_RECORD;
-        }
-
-        if (compressType.equalsIgnoreCase(COMPRESSION_TYPE_NONE)) {
-            throw new IllegalArgumentException(
-                    "Illegal compression type 'NONE'. "
-                            + "For disabling compression remove COMPRESSION_CODEC parameter.");
-        }
-
-        if (!compressType.equalsIgnoreCase(COMPRESSION_TYPE_RECORD)
-                && !compressType.equalsIgnoreCase(COMPRESSION_TYPE_BLOCK)) {
-            throw new IllegalArgumentException("Illegal compression type '"
-                    + compressType + "'");
-        }
-
-        return compressType.toUpperCase();
-    }
-
-    /*
-     * Returns fileName with the codec's file extension appended
-     */
-    private String updateFileExtension(String fileName, CompressionCodec codec) {
-
-        if (codec != null) {
-            fileName += codec.getDefaultExtension();
-        }
-        Log.debug("File name for write: " + fileName);
-        return fileName;
-    }
-
-    @Override
-    public boolean writeNextObject(OneRow onerow) throws IOException {
-        Writable value = (Writable) onerow.getData();
-        Writable key = (Writable) onerow.getKey();
-
-        // init writer on first approach here, based on onerow.getData type
-        // TODO: verify data is serializable.
-        if (writer == null) {
-            Class<? extends Writable> valueClass = value.getClass();
-            Class<? extends Writable> keyClass = (key == null) ? LongWritable.class
-                    : key.getClass();
-            // create writer - do not allow overwriting existing file
-            writer = SequenceFile.createWriter(fc, conf, file, keyClass,
-                    valueClass, compressionType, codec,
-                    new SequenceFile.Metadata(), EnumSet.of(CreateFlag.CREATE));
-        }
-
-        try {
-            writer.append((key == null) ? defaultKey : key, value);
-        } catch (IOException e) {
-            Log.error("Failed to write data to file: " + e.getMessage());
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public void closeForWrite() throws Exception {
-        if (writer != null) {
-            writer.sync();
-            /*
-             * From release 0.21.0 sync() is deprecated in favor of hflush(),
-             * which only guarantees that new readers will see all data written
-             * to that point, and hsync(), which makes a stronger guarantee that
-             * the operating system has flushed the data to disk (like POSIX
-             * fsync), although data may still be in the disk cache.
-             */
-            writer.hsync();
-            writer.close();
-        }
-    }
-
-    public CompressionType getCompressionType() {
-        return compressionType;
-    }
-
-    public CompressionCodec getCodec() {
-        return codec;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
deleted file mode 100644
index 7ace4c8..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.ReadResolver;
-import com.pivotal.pxf.api.WriteResolver;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hdfs.ChunkWritable;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.pivotal.pxf.api.io.DataType.VARCHAR;
-
-/**
- * StringPassResolver handles "deserialization" and serialization of
- * String records. StringPassResolver implements IReadResolver and
- * IWriteResolver interfaces. Returns strings as-is.
- */
-public class StringPassResolver extends Plugin implements ReadResolver, WriteResolver {
-    // for write
-    private OneRow oneRow;
-
-    /**
-     * Constructs a StringPassResolver.
-     *
-     * @param inputData input all input parameters coming from the client request
-     */
-    public StringPassResolver(InputData inputData) {
-        super(inputData);
-        oneRow = new OneRow();
-        this.inputData = inputData;
-    }
-
-    /**
-     * Returns a list of the fields of one record.
-     * Each record field is represented by a {@link OneField} item.
-     * OneField item contains two fields: an integer representing the field type and a Java
-     * Object representing the field value.
-     */
-    @Override
-    public List<OneField> getFields(OneRow onerow) {
-        /*
-         * This call forces a whole text line into a single varchar field and replaces
-		 * the proper field separation code can be found in previous revisions. The reasons
-		 * for doing so as this point are:
-		 * 1. performance
-		 * 2. desire to not replicate text parsing logic from the backend into java
-		 */
-        List<OneField> record = new LinkedList<OneField>();
-		Object data = onerow.getData();
-		if (data instanceof ChunkWritable) {
-			record.add(new OneField(DataType.BYTEA.getOID(), ((ChunkWritable)data).box));
-		}
-		else {
-			record.add(new OneField(VARCHAR.getOID(), data));
-		}
-        return record;
-    }
-
-    /**
-     * Creates a OneRow object from the singleton list.
-     */
-    @Override
-    public OneRow setFields(List<OneField> record) throws Exception {
-        if (((byte[]) record.get(0).val).length == 0) {
-            return null;
-        }
-
-        oneRow.setData(record.get(0).val);
-        return oneRow;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
deleted file mode 100644
index e9df907..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
+++ /dev/null
@@ -1,220 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.*;
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hdfs.utilities.RecordkeyAdapter;
-import com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException;
-import com.pivotal.pxf.service.utilities.Utilities;
-
-import static com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Writable;
-
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.pivotal.pxf.api.io.DataType.*;
-
-/**
- * WritableResolver handles serialization and deserialization of records
- * that were serialized using Hadoop's Writable serialization framework.
- *
- * A field named 'recordkey' is treated as a key of the given row, and not as
- * part of the data schema. See {@link RecordkeyAdapter}.
- */
-public class WritableResolver extends Plugin implements ReadResolver, WriteResolver {
-    private static final int RECORDKEY_UNDEFINED = -1;
-    private static final Log LOG = LogFactory.getLog(WritableResolver.class);
-    private RecordkeyAdapter recordkeyAdapter = new RecordkeyAdapter();
-    private int recordkeyIndex;
-    // reflection fields
-    private Object userObject = null;
-    private Field[] fields = null;
-
-
-    /**
-     * Constructs a WritableResolver.
-     *
-     * @param input all input parameters coming from the client
-     * @throws Exception if schema file is missing, cannot be found in
-     *                   classpath or fails to instantiate
-     */
-    public WritableResolver(InputData input) throws Exception {
-        super(input);
-
-        String schemaName = inputData.getUserProperty("DATA-SCHEMA");
-
-        /** Testing that the schema name was supplied by the user - schema is an optional property. */
-        if (schemaName == null) {
-            throw new DataSchemaException(SCHEMA_NOT_INDICATED, this.getClass().getName());
-        }
-
-        /** Testing that the schema resource exists. */
-        if (!isSchemaOnClasspath(schemaName)) {
-            throw new DataSchemaException(SCHEMA_NOT_ON_CLASSPATH, schemaName);
-        }
-
-        userObject = Utilities.createAnyInstance(schemaName);
-        fields = userObject.getClass().getDeclaredFields();
-        recordkeyIndex = (inputData.getRecordkeyColumn() == null)
-                ? RECORDKEY_UNDEFINED
-                        : inputData.getRecordkeyColumn().columnIndex();
-
-        // fields details:
-        if (LOG.isDebugEnabled()) {
-            for (int i = 0; i < fields.length; i++) {
-                Field field = fields[i];
-                String javaType = field.getType().getName();
-                boolean isPrivate = Modifier.isPrivate(field.getModifiers());
-
-                LOG.debug("Field #" + i + ", name: " + field.getName() +
-                        " type: " + javaType + ", " +
-                        (isArray(javaType) ? "Array" : "Primitive") + ", " +
-                        (isPrivate ? "Private" : "accessible") + " field");
-            }
-        }
-    }
-
-    private boolean isArray(String javaType) {
-        return (javaType.startsWith("[") && !"[B".equals(javaType));
-    }
-
-    @Override
-    public List<OneField> getFields(OneRow onerow) throws Exception {
-        userObject = onerow.getData();
-        List<OneField> record = new LinkedList<OneField>();
-
-        int currentIdx = 0;
-        for (Field field : fields) {
-            if (currentIdx == recordkeyIndex) {
-                currentIdx += recordkeyAdapter.appendRecordkeyField(record, inputData, onerow);
-            }
-
-            if (Modifier.isPrivate(field.getModifiers())) {
-                continue;
-            }
-
-            currentIdx += populateRecord(record, field);
-        }
-
-        return record;
-    }
-
-    int setArrayField(List<OneField> record, int dataType, Field reflectedField) throws IllegalAccessException {
-        Object array = reflectedField.get(userObject);
-        int length = Array.getLength(array);
-        for (int j = 0; j < length; j++) {
-            record.add(new OneField(dataType, Array.get(array, j)));
-        }
-        return length;
-    }
-
-    /*
-     * Given a java Object type, convert it to the corresponding output field
-     * type.
-     */
-    private DataType convertJavaToGPDBType(String type) {
-        if ("boolean".equals(type) || "[Z".equals(type)) {
-            return BOOLEAN;
-        }
-        if ("int".equals(type) || "[I".equals(type)) {
-            return INTEGER;
-        }
-        if ("double".equals(type) || "[D".equals(type)) {
-            return FLOAT8;
-        }
-        if ("java.lang.String".equals(type) || "[Ljava.lang.String;".equals(type)) {
-            return TEXT;
-        }
-        if ("float".equals(type) || "[F".equals(type)) {
-            return REAL;
-        }
-        if ("long".equals(type) || "[J".equals(type)) {
-            return BIGINT;
-        }
-        if ("[B".equals(type)) {
-            return BYTEA;
-        }
-        if ("short".equals(type) || "[S".equals(type)) {
-            return SMALLINT;
-        }
-        throw new UnsupportedTypeException("Type " + type + " is not supported by GPDBWritable");
-    }
-
-    int populateRecord(List<OneField> record, Field field) throws BadRecordException {
-        String javaType = field.getType().getName();
-        try {
-            DataType dataType = convertJavaToGPDBType(javaType);
-            if (isArray(javaType)) {
-                return setArrayField(record, dataType.getOID(), field);
-            }
-            record.add(new OneField(dataType.getOID(), field.get(userObject)));
-            return 1;
-        } catch (IllegalAccessException ex) {
-            throw new BadRecordException(ex);
-        }
-    }
-
-    /**
-     * Sets customWritable fields and creates a OneRow object.
-     */
-    @Override
-    public OneRow setFields(List<OneField> record) throws Exception {
-        Writable key = null;
-
-        int colIdx = 0;
-        for (Field field : fields) {
-            /*
-             * extract recordkey based on the column descriptor type
-             * and add to OneRow.key
-             */
-            if (colIdx == recordkeyIndex) {
-                key = recordkeyAdapter.convertKeyValue(record.get(colIdx).val);
-                colIdx++;
-            }
-
-            if (Modifier.isPrivate(field.getModifiers())) {
-                continue;
-            }
-
-            String javaType = field.getType().getName();
-            convertJavaToGPDBType(javaType);
-            if (isArray(javaType)) {
-                Object value = field.get(userObject);
-                int length = Array.getLength(value);
-                for (int j = 0; j < length; j++, colIdx++) {
-                    Array.set(value, j, record.get(colIdx).val);
-                }
-            } else {
-                field.set(userObject, record.get(colIdx).val);
-                colIdx++;
-            }
-        }
-
-        return new OneRow(key, userObject);
-    }
-
-    /*
-     * Tests for the case schema resource is a file like avro_schema.avsc
-     * or for the case schema resource is a Java class. in which case we try to reflect the class name.
-     */
-    private boolean isSchemaOnClasspath(String resource) {
-        if (this.getClass().getClassLoader().getResource("/" + resource) != null) {
-            return true;
-        }
-
-        try {
-            Class.forName(resource);
-            return true;
-        } catch (ClassNotFoundException e) {
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java
deleted file mode 100644
index db0c5ea..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-/**
- * Thrown when there is a data schema problem detected by any plugin that
- * requires a schema.
- * {@link DataSchemaException.MessageFmt#SCHEMA_NOT_ON_CLASSPATH} when the specified schema is missing from the CLASSPATH.
- * {@link DataSchemaException.MessageFmt#SCHEMA_NOT_INDICATED} when a schema was required but was not specified in the pxf uri.
- */
-public class DataSchemaException extends RuntimeException {
-    public static enum MessageFmt {
-		SCHEMA_NOT_INDICATED("%s requires a data schema to be specified in the "+
-							 "pxf uri, but none was found. Please supply it" +
-							 "using the DATA-SCHEMA option "),
-		SCHEMA_NOT_ON_CLASSPATH("schema resource \"%s\" is not located on the classpath");
-		
-        String format;
-
-        MessageFmt(String format) {
-            this.format = format;
-        }
-
-        public String getFormat() {
-            return format;
-        }
-    }
-
-    private MessageFmt msgFormat;
-
-    /**
-     * Constructs a DataSchemaException.
-     *
-     * @param msgFormat the message format
-     * @param msgArgs the message arguments
-     */
-    public DataSchemaException(MessageFmt msgFormat, String... msgArgs) {
-        super(String.format(msgFormat.getFormat(), (Object[]) msgArgs));
-        this.msgFormat = msgFormat;
-    }
-
-    public MessageFmt getMsgFormat() {
-        return msgFormat;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
deleted file mode 100644
index c7fe103..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
+++ /dev/null
@@ -1,231 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import com.pivotal.pxf.service.utilities.Utilities;
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.utilities.InputData;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import java.io.*;
-import java.util.List;
-
-/**
- * HdfsUtilities class exposes helper methods for PXF classes.
- */
-public class HdfsUtilities {
-    private static Log Log = LogFactory.getLog(HdfsUtilities.class);
-    private static Configuration config = new Configuration();
-    private static CompressionCodecFactory factory = new CompressionCodecFactory(
-            config);
-
-    /**
-     * Hdfs data sources are absolute data paths. Method ensures that dataSource
-     * begins with '/'.
-     *
-     * @param dataSource The HDFS path to a file or directory of interest.
-     *            Retrieved from the client request.
-     * @return an absolute data path
-     */
-    public static String absoluteDataPath(String dataSource) {
-        return (dataSource.charAt(0) == '/') ? dataSource : "/" + dataSource;
-    }
-
-    /*
-     * Helper routine to get a compression codec class
-     */
-    private static Class<? extends CompressionCodec> getCodecClass(Configuration conf,
-                                                                   String name) {
-
-        Class<? extends CompressionCodec> codecClass;
-        try {
-            codecClass = conf.getClassByName(name).asSubclass(
-                    CompressionCodec.class);
-        } catch (ClassNotFoundException e) {
-            throw new IllegalArgumentException("Compression codec " + name
-                    + " was not found.", e);
-        }
-        return codecClass;
-    }
-
-    /**
-     * Helper routine to get compression codec through reflection.
-     *
-     * @param conf configuration used for reflection
-     * @param name codec name
-     * @return generated CompressionCodec
-     */
-    public static CompressionCodec getCodec(Configuration conf, String name) {
-        return ReflectionUtils.newInstance(getCodecClass(conf, name), conf);
-    }
-
-    /**
-     * Helper routine to get compression codec class by path (file suffix).
-     *
-     * @param path path of file to get codec for
-     * @return matching codec class for the path. null if no codec is needed.
-     */
-    private static Class<? extends CompressionCodec> getCodecClassByPath(String path) {
-
-        Class<? extends CompressionCodec> codecClass = null;
-        CompressionCodec codec = factory.getCodec(new Path(path));
-        if (codec != null) {
-            codecClass = codec.getClass();
-        }
-        Log.debug((codecClass == null ? "No codec" : "Codec " + codecClass)
-                + " was found for file " + path);
-        return codecClass;
-    }
-
-    /**
-     * Returns true if the needed codec is splittable. If no codec is needed
-     * returns true as well.
-     *
-     * @param path path of the file to be read
-     * @return if the codec needed for reading the specified path is splittable.
-     */
-    public static boolean isSplittableCodec(Path path) {
-
-        final CompressionCodec codec = factory.getCodec(path);
-        if (null == codec) {
-            return true;
-        }
-
-        return codec instanceof SplittableCompressionCodec;
-    }
-
-    /**
-     * Checks if requests should be handle in a single thread or not.
-     *
-     * @param dataDir hdfs path to the data source
-     * @param compCodec the fully qualified name of the compression codec
-     * @return if the request can be run in multi-threaded mode.
-     */
-    public static boolean isThreadSafe(String dataDir, String compCodec) {
-
-        Class<? extends CompressionCodec> codecClass = (compCodec != null) ? HdfsUtilities.getCodecClass(
-                config, compCodec) : HdfsUtilities.getCodecClassByPath(dataDir);
-        /* bzip2 codec is not thread safe */
-        return (codecClass == null || !BZip2Codec.class.isAssignableFrom(codecClass));
-    }
-
-    /**
-     * Prepares byte serialization of a file split information (start, length,
-     * hosts) using {@link ObjectOutputStream}.
-     *
-     * @param fsp file split to be serialized
-     * @return byte serialization of fsp
-     * @throws IOException if I/O errors occur while writing to the underlying
-     *             stream
-     */
-    public static byte[] prepareFragmentMetadata(FileSplit fsp)
-            throws IOException {
-        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
-        ObjectOutputStream objectStream = new ObjectOutputStream(
-                byteArrayStream);
-        objectStream.writeLong(fsp.getStart());
-        objectStream.writeLong(fsp.getLength());
-        objectStream.writeObject(fsp.getLocations());
-
-        return byteArrayStream.toByteArray();
-    }
-
-    /**
-     * Parses fragment metadata and return matching {@link FileSplit}.
-     *
-     * @param inputData request input data
-     * @return FileSplit with fragment metadata
-     */
-    public static FileSplit parseFragmentMetadata(InputData inputData) {
-        try {
-            byte[] serializedLocation = inputData.getFragmentMetadata();
-            if (serializedLocation == null) {
-                throw new IllegalArgumentException(
-                        "Missing fragment location information");
-            }
-
-            ByteArrayInputStream bytesStream = new ByteArrayInputStream(
-                    serializedLocation);
-            ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
-
-            long start = objectStream.readLong();
-            long end = objectStream.readLong();
-
-            String[] hosts = (String[]) objectStream.readObject();
-
-            FileSplit fileSplit = new FileSplit(new Path(
-                    inputData.getDataSource()), start, end, hosts);
-
-            Log.debug("parsed file split: path " + inputData.getDataSource()
-                    + ", start " + start + ", end " + end + ", hosts "
-                    + ArrayUtils.toString(hosts));
-
-            return fileSplit;
-
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    "Exception while reading expected fragment metadata", e);
-        }
-    }
-
-    /**
-     * Accessing the Avro file through the "unsplittable" API just to get the
-     * schema. The splittable API (AvroInputFormat) which is the one we will be
-     * using to fetch the records, does not support getting the Avro schema yet.
-     *
-     * @param conf Hadoop configuration
-     * @param dataSource Avro file (i.e fileName.avro) path
-     * @return the Avro schema
-     * @throws IOException if I/O error occured while accessing Avro schema file
-     */
-    public static Schema getAvroSchema(Configuration conf, String dataSource)
-            throws IOException {
-        FsInput inStream = new FsInput(new Path(dataSource), conf);
-        DatumReader<GenericRecord> dummyReader = new GenericDatumReader<>();
-        DataFileReader<GenericRecord> dummyFileReader = new DataFileReader<>(
-                inStream, dummyReader);
-        Schema schema = dummyFileReader.getSchema();
-        dummyFileReader.close();
-        return schema;
-    }
-
-    /**
-     * Returns string serialization of list of fields. Fields of binary type
-     * (BYTEA) are converted to octal representation to make sure they will be
-     * relayed properly to the DB.
-     *
-     * @param complexRecord list of fields to be stringified
-     * @param delimiter delimiter between fields
-     * @return string of serialized fields using delimiter
-     */
-    public static String toString(List<OneField> complexRecord, String delimiter) {
-        StringBuilder buff = new StringBuilder();
-        String delim = ""; // first iteration has no delimiter
-        for (OneField complex : complexRecord) {
-            if (complex.type == DataType.BYTEA.getOID()) {
-                /** Serialize byte array as string */
-                buff.append(delim);
-                Utilities.byteArrayToOctalString((byte[]) complex.val, buff);
-            } else {
-                buff.append(delim).append(complex.val);
-            }
-            delim = delimiter;
-        }
-        return buff.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java
deleted file mode 100644
index 958495c..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.*;
-
-import java.io.IOException;
-
-/**
- * PxfInputFormat is not intended to read a specific format, hence it implements
- * a dummy getRecordReader Instead, its purpose is to apply
- * FileInputFormat.getSplits from one point in PXF and get the splits which are
- * valid for the actual InputFormats, since all of them we use inherit
- * FileInputFormat but do not override getSplits.
- */
-public class PxfInputFormat extends FileInputFormat {
-
-    @Override
-    public RecordReader getRecordReader(InputSplit split,
-                                        JobConf conf,
-                                        Reporter reporter) throws IOException {
-        throw new UnsupportedOperationException("PxfInputFormat should not be used for reading data, but only for obtaining the splits of a file");
-    }
-
-    /*
-     * Return true if this file can be split.
-     */
-    @Override
-    protected boolean isSplitable(FileSystem fs, Path filename) {
-        return HdfsUtilities.isSplittableCodec(filename);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
deleted file mode 100644
index a88ade0..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
+++ /dev/null
@@ -1,265 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.utilities.ColumnDescriptor;
-import com.pivotal.pxf.api.utilities.InputData;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.*;
-
-import java.util.List;
-
-/**
- * Adapter used for adding a recordkey field to the records output {@code List<OneField>}.
- */
-public class RecordkeyAdapter {
-    private Log Log;
-
-    /*
-     * We need to transform Record keys to java primitive types.
-     * Since the type of the key is the same throughout the file we do the type resolution
-     * in the first call (for the first record) and then use a "Java variation on Function pointer"
-     * to do the extraction for the rest of the records.
-     */
-    private interface ValExtractor {
-        public Object get(Object key);
-    }
-
-    private ValExtractor extractor = null;
-
-    private interface ValConverter {
-        public Writable get(Object key);
-    }
-
-    private ValConverter converter = null;
-
-    /**
-     * Constructs a RecordkeyAdapter.
-     */
-    public RecordkeyAdapter() {
-        Log = LogFactory.getLog(RecordkeyAdapter.class);
-    }
-
-    /**
-     *  Adds the recordkey to the end of the passed in recFields list.
-     *  <p>
-     *  This method also verifies cases in which record keys are not supported
-     *  by the underlying source type, and therefore "illegally" requested.
-     *
-     * @param recFields existing list of record (non-key) fields and their values.
-     * @param input all input parameters coming from the client request
-     * @param onerow a row object which is used here in order to find out if
-     *        the given type supports recordkeys or not.
-     * @return 0 if record key not needed, or 1 if record key was appended
-     * @throws NoSuchFieldException when the given record type does not support
-     *         recordkeys
-     */
-    public int appendRecordkeyField(List<OneField> recFields,
-                                    InputData input,
-                                    OneRow onerow) throws NoSuchFieldException {
-
-		/*
-		 * user did not request the recordkey field in the
-		 * "create external table" statement
-		 */
-        ColumnDescriptor recordkeyColumn = input.getRecordkeyColumn();
-        if (recordkeyColumn == null) {
-            return 0;
-        }
-
-		/*
-		 * The recordkey was filled in the fileAccessor during execution of
-		 * method readNextObject. The current accessor implementations are
-		 * SequenceFileAccessor, LineBreakAccessor and AvroFileAccessor from
-		 * HdfsSplittableDataAccessor and QuotedLineBreakAccessor from
-		 * HdfsAtomicDataAccessor. For SequenceFileAccessor, LineBreakAccessor
-		 * the recordkey is set, since it is returned by the
-		 * SequenceFileRecordReader or LineRecordReader(for text file). But Avro
-		 * files do not have keys, so the AvroRecordReader will not return a key
-		 * and in this case recordkey will be null. If the user specified a
-		 * recordkey attribute in the CREATE EXTERNAL TABLE statement and he
-		 * reads from an AvroFile, we will throw an exception since the Avro
-		 * file does not have keys In the future, additional implementations of
-		 * FileAccessors will have to set recordkey during readNextObject().
-		 * Otherwise it is null by default and we will throw an exception here,
-		 * that is if we get here... a careful user will not specify recordkey
-		 * in the CREATE EXTERNAL statement and then we will leave this function
-		 * one line above.
-		 */
-        Object recordkey = onerow.getKey();
-        if (recordkey == null) {
-            throw new NoSuchFieldException("Value for field \"recordkey\" was requested but the queried HDFS resource type does not support key");
-        }
-
-        OneField oneField = new OneField();
-        oneField.type = recordkeyColumn.columnTypeCode();
-        oneField.val = extractVal(recordkey);
-        recFields.add(oneField);
-        return 1;
-    }
-
-    /*
-	 * Extracts a java primitive type value from the recordkey. If the key is a
-	 * Writable implementation we extract the value as a Java primitive. If the
-	 * key is already a Java primitive we returned it as is If it is an unknown
-	 * type we throw an exception
-	 */
-    private Object extractVal(Object key) {
-        if (extractor == null) {
-            extractor = InitializeExtractor(key);
-        }
-
-        return extractor.get(key);
-    }
-
-    /*
-     * Initialize the extractor object based on the type of the recordkey
-     */
-    private ValExtractor InitializeExtractor(Object key) {
-        if (key instanceof IntWritable) {
-            return new ValExtractor() {
-                @Override
-                public Object get(Object key) {
-                    return ((IntWritable) key).get();
-                }
-            };
-        } else if (key instanceof ByteWritable) {
-            return new ValExtractor() {
-                @Override
-                public Object get(Object key) {
-                    return ((ByteWritable) key).get();
-                }
-            };
-        } else if (key instanceof BooleanWritable) {
-            return new ValExtractor() {
-                @Override
-                public Object get(Object key) {
-                    return ((BooleanWritable) key).get();
-                }
-            };
-        } else if (key instanceof DoubleWritable) {
-            return new ValExtractor() {
-                @Override
-                public Object get(Object key) {
-                    return ((DoubleWritable) key).get();
-                }
-            };
-        } else if (key instanceof FloatWritable) {
-            return new ValExtractor() {
-                @Override
-                public Object get(Object key) {
-                    return ((FloatWritable) key).get();
-                }
-            };
-        } else if (key instanceof LongWritable) {
-            return new ValExtractor() {
-                @Override
-                public Object get(Object key) {
-                    return ((LongWritable) key).get();
-                }
-            };
-        } else if (key instanceof Text) {
-            return new ValExtractor() {
-                @Override
-                public Object get(Object key) {
-                    return (key).toString();
-                }
-            };
-        } else if (key instanceof VIntWritable) {
-            return new ValExtractor() {
-                @Override
-                public Object get(Object key) {
-                    return ((VIntWritable) key).get();
-                }
-            };
-        } else {
-            return new ValExtractor() {
-                @Override
-                public Object get(Object key) {
-                    throw new UnsupportedOperationException("Unsupported recordkey data type " + key.getClass().getName());
-                }
-            };
-        }
-    }
-
-    /**
-     * Converts given key object to its matching Writable.
-     * Supported types: Integer, Byte, Boolean, Double, Float, Long, String.
-     * The type is only checked once based on the key, all consequent calls
-     * must be of the same type.
-     *
-     * @param key object to convert
-     * @return Writable object matching given key
-     */
-    public Writable convertKeyValue(Object key) {
-        if (converter == null) {
-            converter = initializeConverter(key);
-            Log.debug("converter initialized for type " + key.getClass() +
-                    " (key value: " + key + ")");
-        }
-
-        return converter.get(key);
-    }
-
-    private ValConverter initializeConverter(Object key) {
-
-        if (key instanceof Integer) {
-            return new ValConverter() {
-                @Override
-                public Writable get(Object key) {
-                    return (new IntWritable((Integer) key));
-                }
-            };
-        } else if (key instanceof Byte) {
-            return new ValConverter() {
-                @Override
-                public Writable get(Object key) {
-                    return (new ByteWritable((Byte) key));
-                }
-            };
-        } else if (key instanceof Boolean) {
-            return new ValConverter() {
-                @Override
-                public Writable get(Object key) {
-                    return (new BooleanWritable((Boolean) key));
-                }
-            };
-        } else if (key instanceof Double) {
-            return new ValConverter() {
-                @Override
-                public Writable get(Object key) {
-                    return (new DoubleWritable((Double) key));
-                }
-            };
-        } else if (key instanceof Float) {
-            return new ValConverter() {
-                @Override
-                public Writable get(Object key) {
-                    return (new FloatWritable((Float) key));
-                }
-            };
-        } else if (key instanceof Long) {
-            return new ValConverter() {
-                @Override
-                public Writable get(Object key) {
-                    return (new LongWritable((Long) key));
-                }
-            };
-        } else if (key instanceof String) {
-            return new ValConverter() {
-                @Override
-                public Writable get(Object key) {
-                    return (new Text((String) key));
-                }
-            };
-        } else {
-            return new ValConverter() {
-                @Override
-                public Writable get(Object key) {
-                    throw new UnsupportedOperationException("Unsupported recordkey data type " + key.getClass().getName());
-                }
-            };
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java
new file mode 100644
index 0000000..fab51ca
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java
@@ -0,0 +1,77 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.*;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+
+import static org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities.getAvroSchema;
+
+/**
+ * A PXF Accessor for reading Avro File records
+ */
+public class AvroFileAccessor extends HdfsSplittableDataAccessor {
+    private AvroWrapper<GenericRecord> avroWrapper = null;
+
+    /**
+     * Constructs a AvroFileAccessor that creates the job configuration and
+     * accesses the avro file to fetch the avro schema
+     *
+     * @param input all input parameters coming from the client
+     * @throws Exception if getting the avro schema fails
+     */
+    public AvroFileAccessor(InputData input) throws Exception {
+        // 1. Call the base class
+        super(input, new AvroInputFormat<GenericRecord>());
+
+        // 2. Accessing the avro file through the "unsplittable" API just to get the schema.
+        //    The splittable API (AvroInputFormat) which is the one we will be using to fetch
+        //    the records, does not support getting the avro schema yet.
+        Schema schema = getAvroSchema(conf, inputData.getDataSource());
+
+        // 3. Pass the schema to the AvroInputFormat
+        AvroJob.setInputSchema(jobConf, schema);
+
+        // 4. The avroWrapper required for the iteration
+        avroWrapper = new AvroWrapper<GenericRecord>();
+    }
+
+    @Override
+    protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
+        return new AvroRecordReader<Object>(jobConf, (FileSplit) split);
+    }
+
+    /**
+     * readNextObject
+     * The AVRO accessor is currently the only specialized accessor that
+     * overrides this method. This happens, because the special
+     * AvroRecordReader.next() semantics (use of the AvroWrapper), so it
+     * cannot use the RecordReader's default implementation in
+     * SplittableFileAccessor
+     */
+    @Override
+    public OneRow readNextObject() throws IOException {
+        /** Resetting datum to null, to avoid stale bytes to be padded from the previous row's datum */
+        avroWrapper.datum(null);
+        if (reader.next(avroWrapper, NullWritable.get())) { // There is one more record in the current split.
+            return new OneRow(null, avroWrapper.datum());
+        } else if (getNextSplit()) { // The current split is exhausted. try to move to the next split.
+            return reader.next(avroWrapper, NullWritable.get())
+                    ? new OneRow(null, avroWrapper.datum())
+                    : null;
+        }
+
+        // if neither condition was met, it means we already read all the records in all the splits, and
+        // in this call record variable was not set, so we return null and thus we are signaling end of
+        // records sequence - in this case avroWrapper.datum() will be null
+        return null;
+    }
+}