You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2015/10/28 23:10:02 UTC

[12/15] incubator-hawq git commit: HAWQ-45. PXF Package Namespace change

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolver.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolver.java
new file mode 100644
index 0000000..3ce0d1b
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolver.java
@@ -0,0 +1,148 @@
+package org.apache.hawq.pxf.plugins.hbase;
+
+import org.apache.hawq.pxf.api.*;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseColumnDescriptor;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseTupleDescription;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.sql.Timestamp;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Record resolver for HBase.
+ *
+ * The class is responsible to convert rows from HBase scans (returned as {@link Result} objects)
+ * into a List of {@link OneField} objects.
+ * That also includes the conversion process of each HBase column's value into its HAWQ assigned type.
+ *
+ * Currently, the class assumes all HBase values are stored as String object Bytes encoded.
+ */
+public class HBaseResolver extends Plugin implements ReadResolver {
+    private HBaseTupleDescription tupleDescription;
+
+    /**
+     * Constructs a resolver and initializes the table's tuple description.
+     *
+     * @param input query information, contains HBase table name and filter
+     */
+    public HBaseResolver(InputData input) {
+        super(input);
+        tupleDescription = new HBaseTupleDescription(input);
+    }
+
+    /**
+     * Splits an HBase {@link Result} object into a list of {@link OneField},
+     * based on the table's tuple description.
+     * Each field is converted from HBase bytes into its column description type.
+     *
+     * @return list of fields
+     */
+    @Override
+    public List<OneField> getFields(OneRow onerow) throws Exception {
+        Result result = (Result) onerow.getData();
+        LinkedList<OneField> fields = new LinkedList<OneField>();
+
+        for (int i = 0; i < tupleDescription.columns(); ++i) {
+            HBaseColumnDescriptor column = tupleDescription.getColumn(i);
+            byte[] value;
+
+            if (column.isKeyColumn()) // if a row column is requested
+            {
+                value = result.getRow(); // just return the row key
+            } else // else, return column value
+            {
+                value = getColumnValue(result, column);
+            }
+
+            OneField oneField = new OneField();
+            oneField.type = column.columnTypeCode();
+            oneField.val = convertToJavaObject(oneField.type, column.columnTypeName(), value);
+            fields.add(oneField);
+        }
+        return fields;
+    }
+
+    /**
+     * Converts given byte array value to the matching java object, according to
+     * the given type code.
+     *
+     * @param typeCode ColumnDescriptor type id
+     * @param typeName type name. Used for error messages
+     * @param val value to be converted
+     * @return value converted to matching object type
+     * @throws Exception when conversion fails or type code is not supported
+     */
+    Object convertToJavaObject(int typeCode, String typeName, byte[] val) throws Exception {
+        if (val == null) {
+            return null;
+        }
+        try {
+            switch (DataType.get(typeCode)) {
+                case TEXT:
+                case VARCHAR:
+                case BPCHAR:
+                    return Bytes.toString(val);
+
+                case INTEGER:
+                    return Integer.parseInt(Bytes.toString(val));
+
+                case BIGINT:
+                    return Long.parseLong(Bytes.toString(val));
+
+                case SMALLINT:
+                    return Short.parseShort(Bytes.toString(val));
+
+                case REAL:
+                    return Float.parseFloat(Bytes.toString(val));
+
+                case FLOAT8:
+                    return Double.parseDouble(Bytes.toString(val));
+
+                case BYTEA:
+                    return val;
+
+                case BOOLEAN:
+                    return Boolean.valueOf(Bytes.toString(val));
+
+                case NUMERIC:
+                    return Bytes.toString(val);
+
+                case TIMESTAMP:
+                    return Timestamp.valueOf(Bytes.toString(val));
+
+                default:
+                    throw new UnsupportedTypeException("Unsupported data type " + typeName);
+            }
+        } catch (NumberFormatException e) {
+            throw new BadRecordException("Error converting value '" + Bytes.toString(val) + "' " +
+                    "to type " + typeName + ". " +
+                    "(original error: " + e.getMessage() + ")");
+        }
+    }
+
+    /**
+     * Returns the value of a column from a Result object.
+     *
+     * @param result HBase table row
+     * @param column HBase column to be retrieved
+     * @return HBase column value
+     */
+    byte[] getColumnValue(Result result, HBaseColumnDescriptor column) {
+        // if column does not contain a value, return null
+        if (!result.containsColumn(column.columnFamilyBytes(),
+                column.qualifierBytes())) {
+            return null;
+        }
+
+        // else, get the latest version of the requested column
+        Cell cell = result.getColumnLatestCell(column.columnFamilyBytes(), column.qualifierBytes());
+        return CellUtil.cloneValue(cell);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
new file mode 100644
index 0000000..177cfa2
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
@@ -0,0 +1,82 @@
+package org.apache.hawq.pxf.plugins.hbase.utilities;
+
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Arrays;
+
+/**
+ * {@link ColumnDescriptor} for HBase columns.
+ */
+public class HBaseColumnDescriptor extends ColumnDescriptor {
+    byte[] columnFamily;
+    byte[] qualifier;
+
+    /**
+     * Constructs a column descriptor using the given copy's column name.
+     *
+     * @param copy column descriptor to be copied
+     */
+    public HBaseColumnDescriptor(ColumnDescriptor copy) {
+        this(copy, copy.columnName().getBytes());
+    }
+
+    /**
+     * Constructs an HBase column descriptor from a generic column descriptor and an HBase column name.
+     * <p>
+     * The column name must be in either of the following forms:
+     * <ol>
+     * <li>columnfamily:qualifier - standard HBase column.</li>
+     * <li>recordkey - Row key column (case insensitive).</li>
+     * </ol>
+     * <p>
+     * For recordkey, no HBase name is created.
+     *
+     * @param copy column descriptor
+     * @param newColumnName HBase column name - can be different than the given column descriptor name.
+     */
+    public HBaseColumnDescriptor(ColumnDescriptor copy, byte[] newColumnName) {
+        super(copy);
+
+        if (isKeyColumn()) {
+            return;
+        }
+
+        int seperatorIndex = getSeparatorIndex(newColumnName);
+
+        columnFamily = Arrays.copyOfRange(newColumnName, 0, seperatorIndex);
+        qualifier = Arrays.copyOfRange(newColumnName, seperatorIndex + 1, newColumnName.length);
+    }
+
+    /**
+     * Returns the family column name.
+     * (E.g. "cf1:q2" will return "cf1")
+     *
+     * @return family column name
+     */
+    public byte[] columnFamilyBytes() {
+        return columnFamily;
+    }
+
+    /**
+     * Returns the qualifier column name.
+     * (E.g. "cf1:q2" will return "q2")
+     *
+     * @return qualifier column name
+     */
+    public byte[] qualifierBytes() {
+        return qualifier;
+    }
+
+    private int getSeparatorIndex(byte[] columnName) {
+        for (int i = 0; i < columnName.length; ++i) {
+            if (columnName[i] == ':') {
+                return i;
+            }
+        }
+
+        throw new IllegalArgumentException("Illegal HBase column name " +
+                Bytes.toString(columnName) +
+                ", missing :");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
new file mode 100644
index 0000000..2da92db
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
@@ -0,0 +1,92 @@
+package org.apache.hawq.pxf.plugins.hbase.utilities;
+
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.SubstringComparator;
+import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * This is a Filter comparator for HBase It is external to PXF HBase code.
+ * <p>
+ * To use with HBase it must reside in the classpath of every region server.
+ * <p>
+ * It converts a value into {@link Long} before comparing.
+ * The filter is good for any integer numeric comparison i.e. integer, bigint, smallint.
+ * <p>
+ * according to HBase 0.96 requirements, this must serialized using Protocol Buffers
+ * ({@link #toByteArray()} and {@link #parseFrom(byte[])} methods).
+ * <p>
+ * A reference can be found in {@link SubstringComparator}.
+ */
+public class HBaseIntegerComparator extends ByteArrayComparable {
+	private Long val;
+
+
+	public HBaseIntegerComparator(Long inVal) {
+		super(Bytes.toBytes(inVal));
+		this.val = inVal;
+	}
+
+	/**
+	 * The comparison function. Currently uses {@link Long#parseLong(String)}.
+	 *
+	 * @return 0 if equal;
+	 *         a value less than 0 if row value is less than filter value;
+	 *         and a value greater than 0 if the row value is greater than the filter value.
+	 */
+	@Override
+	public int compareTo(byte[] value, int offset, int length) {
+		/**
+		 * Fix for HD-2610: query fails when recordkey is integer.
+		 */
+		if (length == 0)
+			return 1; // empty line, can't compare.
+
+		/**
+		 * TODO optimize by parsing the bytes directly.
+		 * Maybe we can even determine if it is an int or a string encoded.
+		 */
+		String valueAsString = new String(value, offset, length);
+		Long valueAsLong = Long.parseLong(valueAsString);
+		return val.compareTo(valueAsLong);
+	}
+
+	/**
+	 * Returns the comparator serialized using Protocol Buffers.
+	 *
+	 * @return serialized comparator
+	 */
+	@Override
+	public byte[] toByteArray() {
+		ComparatorProtos.ByteArrayComparable.Builder builder = ComparatorProtos.ByteArrayComparable.newBuilder();
+		builder.setValue(ByteString.copyFrom(getValue()));
+		return builder.build().toByteArray();
+	}
+
+	/**
+	 * Hides ("overrides") a static method in {@link ByteArrayComparable}.
+	 * This method will be called in deserialization.
+	 *
+	 * @param pbBytes
+	 *            A pb serialized instance
+	 * @return An instance of {@link HBaseIntegerComparator} made from
+	 *         <code>bytes</code>
+	 * @throws DeserializationException if deserialization of bytes to Protocol Buffers failed
+	 * @see #toByteArray
+	 */
+	public static ByteArrayComparable parseFrom(final byte[] pbBytes)
+			throws DeserializationException {
+		ComparatorProtos.ByteArrayComparable proto;
+		try {
+			proto = ComparatorProtos.ByteArrayComparable.parseFrom(pbBytes);
+		} catch (InvalidProtocolBufferException e) {
+			throw new DeserializationException(e);
+		}
+		return new HBaseIntegerComparator(Bytes.toLong(proto.getValue()
+				.toByteArray()));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseLookupTable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseLookupTable.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseLookupTable.java
new file mode 100644
index 0000000..dc2131c
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseLookupTable.java
@@ -0,0 +1,183 @@
+package org.apache.hawq.pxf.plugins.hbase.utilities;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * HBaseLookupTable will load a table's lookup information
+ * from HBase pxflookup table if exists.<br>
+ * This table holds mappings between HAWQ column names (key) and HBase column names (value).<br>
+ * E.g. for an HBase table "hbase_table", mappings between HAWQ column names and HBase column names,
+ * when <code>"hawq1"</code> is mapped to <code>"cf1:hbase1"</code> and
+ * <code>"hawq2"</code> is mapped to <code>"cf1:hbase2"</code>, will be:<br>
+ * <pre>
+ * 	ROW                     COLUMN+CELL
+ *  hbase_table             column=mapping:hawq1, value=cf1:hbase1
+ *  hbase_table             column=mapping:hawq2, value=cf1:hbase2
+ * </pre>
+ *
+ * Data is returned as a map of string and byte array from {@link #getMappings(String)}.
+ * <p>
+ * Once created, {@link #close()} MUST be called to cleanup resources.
+ */
+public class HBaseLookupTable implements Closeable {
+    private static final String LOOKUPTABLENAME = "pxflookup";
+    private static final byte[] LOOKUPCOLUMNFAMILY = Bytes.toBytes("mapping");
+
+    private static final Log LOG = LogFactory.getLog(HBaseLookupTable.class);
+
+    private Connection connection;
+    private Configuration hbaseConfiguration;
+    private Admin admin;
+    private Map<byte[], byte[]> rawTableMapping;
+    private Table lookupTable;
+
+    /**
+     * Constructs a connector to HBase lookup table.
+     * Requires calling {@link #close()} to close {@link HBaseAdmin} instance.
+     *
+     * @param conf HBase configuration
+     * @throws IOException when initializing HBaseAdmin fails
+     */
+    public HBaseLookupTable(Configuration conf) throws Exception {
+        hbaseConfiguration = conf;
+        connection = ConnectionFactory.createConnection(hbaseConfiguration);
+        admin = connection.getAdmin();
+        ClusterStatus cs = admin.getClusterStatus();
+        LOG.debug("HBase cluster has " + cs.getServersSize() + " region servers " +
+                "(" + cs.getDeadServers() + " dead)");
+    }
+
+    /**
+     * Returns mappings for given table name between its HAWQ column names and
+     * HBase column names.
+     * If lookup table doesn't exist or no mappings for the table exist, returns null.
+     * <p>
+     * All HAWQ column names are returns in low case.
+     *
+     * @param tableName HBase table name
+     * @return mappings between HAWQ column names and HBase column names
+     * @throws IOException when HBase operations fail
+     */
+    public Map<String, byte[]> getMappings(String tableName) throws IOException {
+        if (!lookupTableValid()) {
+            return null;
+        }
+
+        loadTableMappings(tableName);
+
+        if (tableHasNoMappings()) {
+            return null;
+        }
+
+        return lowerCaseMappings();
+    }
+
+    /**
+     * Closes HBase resources. Must be called after initializing this class.
+     */
+    @Override
+    public void close() throws IOException {
+        admin.close();
+    }
+
+    /**
+     * Returns true if {@link #LOOKUPTABLENAME} is available and enabled.
+     *
+     * @return whether lookup table is valid
+     */
+    private boolean lookupTableValid() throws IOException {
+        return (HBaseUtilities.isTableAvailable(admin, LOOKUPTABLENAME) &&
+                lookupHasCorrectStructure());
+    }
+
+    /**
+     * Returns true if {@link #LOOKUPTABLENAME} has {@value #LOOKUPCOLUMNFAMILY} family.
+     *
+     * @return whether lookup has expected column family name
+     */
+    private boolean lookupHasCorrectStructure() throws IOException {
+        HTableDescriptor htd = admin.getTableDescriptor(TableName.valueOf(LOOKUPTABLENAME));
+        return htd.hasFamily(LOOKUPCOLUMNFAMILY);
+    }
+
+    /**
+     * Loads table name mappings from {@link #LOOKUPTABLENAME} lookup table.
+     *
+     * @param tableName table name
+     */
+    private void loadTableMappings(String tableName) throws IOException {
+        openLookupTable();
+        loadMappingMap(tableName);
+        closeLookupTable();
+    }
+
+    /**
+     * Returns true if lookup table has no relevant mappings.
+     * Should be called after {@link #loadMappingMap(String)}.
+     */
+    private boolean tableHasNoMappings() {
+        return MapUtils.isEmpty(rawTableMapping);
+    }
+
+    /**
+     * Returns a map of mappings between HAWQ and HBase column names,
+     * with the HAWQ column values in lower case.
+     */
+    private Map<String, byte[]> lowerCaseMappings() {
+        Map<String, byte[]> lowCaseKeys = new HashMap<String, byte[]>();
+        for (Map.Entry<byte[], byte[]> entry : rawTableMapping.entrySet()) {
+            lowCaseKeys.put(lowerCase(entry.getKey()),
+                    entry.getValue());
+        }
+
+        return lowCaseKeys;
+    }
+
+    /**
+     * Load hbase table object using ConnectionFactory
+     */
+    private void openLookupTable() throws IOException {
+        lookupTable = connection.getTable(TableName.valueOf(LOOKUPTABLENAME));
+    }
+
+    /**
+     * Loads mappings for given table name from the lookup table {@link #LOOKUPTABLENAME}.
+     * The table name should be in the row key, and the family name should be {@link #LOOKUPCOLUMNFAMILY}.
+     *
+     * @param tableName HBase table name
+     * @throws IOException when HBase operations fail
+     */
+    private void loadMappingMap(String tableName) throws IOException {
+        Get lookupRow = new Get(Bytes.toBytes(tableName));
+        lookupRow.setMaxVersions(1);
+        lookupRow.addFamily(LOOKUPCOLUMNFAMILY);
+        Result row;
+
+        row = lookupTable.get(lookupRow);
+        rawTableMapping = row.getFamilyMap(LOOKUPCOLUMNFAMILY);
+        LOG.debug("lookup table mapping for " + tableName +
+                " has " + (rawTableMapping == null ? 0 : rawTableMapping.size()) + " entries");
+    }
+
+    private void closeLookupTable() throws IOException {
+        lookupTable.close();
+        HBaseUtilities.closeConnection(admin, connection);
+    }
+
+    private String lowerCase(byte[] key) {
+        return Bytes.toString(key).toLowerCase();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseTupleDescription.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
new file mode 100644
index 0000000..9150b8d
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
@@ -0,0 +1,120 @@
+package org.apache.hawq.pxf.plugins.hbase.utilities;
+
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The class extends the tuple description provided by {@link InputData}
+ * for usage of {@link HBaseColumnDescriptor}.
+ * <p>
+ * This class also loads lookup table sent (optionally) by the
+ * fragmenter.
+ */
+public class HBaseTupleDescription {
+    private Map<String, byte[]> tableMapping;
+    private List<HBaseColumnDescriptor> tupleDescription;
+    private InputData conf;
+
+    /**
+     * Constructs tuple description of the HBase table.
+     *
+     * @param conf data containing table tuple description
+     */
+    public HBaseTupleDescription(InputData conf) {
+        this.conf = conf;
+        parseHBaseTupleDescription();
+    }
+
+    /**
+     * Returns the number of fields.
+     *
+     * @return number of fields
+     */
+    public int columns() {
+        return tupleDescription.size();
+    }
+
+    /**
+     * Returns the column description of index column.
+     *
+     * @param index column index to be returned
+     * @return column description
+     */
+    public HBaseColumnDescriptor getColumn(int index) {
+        return tupleDescription.get(index);
+    }
+
+    private void parseHBaseTupleDescription() {
+        tupleDescription = new ArrayList<HBaseColumnDescriptor>();
+        loadUserData();
+        createTupleDescription();
+    }
+
+    /**
+     * Loads user information from fragmenter.
+     * The data contains optional table mappings from the lookup table,
+     * between field names in HAWQ table and in the HBase table.
+     */
+    @SuppressWarnings("unchecked")
+    private void loadUserData() {
+        try {
+            byte[] serializedTableMappings = conf.getFragmentUserData();
+
+            // No userdata means no mappings for our table in lookup table
+            if (serializedTableMappings == null) {
+                return;
+            }
+
+            ByteArrayInputStream bytesStream = new ByteArrayInputStream(serializedTableMappings);
+            ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
+            tableMapping = (Map<String, byte[]>) objectStream.readObject();
+        } catch (Exception e) {
+            throw new RuntimeException("Exception while reading expected user data from HBase's fragmenter", e);
+        }
+    }
+
+    private void createTupleDescription() {
+        for (int i = 0; i < conf.getColumns(); ++i) {
+            ColumnDescriptor column = conf.getColumn(i);
+            tupleDescription.add(getHBaseColumn(column));
+        }
+    }
+
+    /**
+     * Returns the {@link #HBaseColumnDescriptor} for given column.
+     * If the column has a lookup table mapping, the HBase column name is used.
+     *
+     * @param column HAWQ column description
+     * @return matching HBase column description
+     */
+    private HBaseColumnDescriptor getHBaseColumn(ColumnDescriptor column) {
+        if (!column.isKeyColumn() && hasMapping(column)) {
+            return new HBaseColumnDescriptor(column, getMapping(column));
+        }
+        return new HBaseColumnDescriptor(column);
+    }
+
+    /**
+     * Returns true if there is a mapping for given column name.
+     */
+    private boolean hasMapping(ColumnDescriptor column) {
+        return tableMapping != null &&
+                tableMapping.containsKey(column.columnName().toLowerCase());
+    }
+
+    /**
+     * Returns the HBase name mapping for the given column name.
+     *
+     * @param column HAWQ column description
+     * @return HBase name for the column
+     */
+    private byte[] getMapping(ColumnDescriptor column) {
+        return tableMapping.get(column.columnName().toLowerCase());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseUtilities.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseUtilities.java
new file mode 100644
index 0000000..01f40bc
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseUtilities.java
@@ -0,0 +1,57 @@
+package org.apache.hawq.pxf.plugins.hbase.utilities;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+
+public class HBaseUtilities {
+
+    /**
+     * Initializes HBase configuration.
+     * The following parameters are edited:
+     *
+     * hbase.client.retries.number = 1
+     *  - tries to connect to HBase only 2 times before failing.
+     *
+     * @return HBase configuration
+     */
+    public static Configuration initHBaseConfiguration() {
+        Configuration conf = HBaseConfiguration.create();
+        conf.set("hbase.client.retries.number", "3");
+        return conf;
+    }
+
+    /**
+     * Returns if given table exists and is enabled.
+     *
+     * @param hbaseAdmin HBase admin, must be initialized
+     * @param tableName table name
+     * @return true if table exists
+     * @throws IOException if a remote or network exception occurs when connecting to HBase
+     */
+    public static boolean isTableAvailable(Admin hbaseAdmin, String tableName) throws IOException {
+        TableName name = TableName.valueOf(tableName);
+        return hbaseAdmin.isTableAvailable(name) &&
+                hbaseAdmin.isTableEnabled(name);
+    }
+
+    /**
+     * Closes HBase admin and connection if they are open.
+     *
+     * @param hbaseAdmin HBase admin
+     * @param hbaseConnection HBase connection
+     * @throws IOException if an I/O error occurs when connecting to HBase
+     */
+    public static void closeConnection(Admin hbaseAdmin, Connection hbaseConnection) throws IOException {
+        if (hbaseAdmin != null) {
+            hbaseAdmin.close();
+        }
+        if (hbaseConnection != null) {
+            hbaseConnection.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseAccessorTest.java b/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseAccessorTest.java
deleted file mode 100644
index 51a6c02..0000000
--- a/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseAccessorTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-package com.pivotal.pxf.plugins.hbase;
-
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseTupleDescription;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.junit.After;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({HBaseAccessor.class, HBaseConfiguration.class, ConnectionFactory.class})
-public class HBaseAccessorTest {
-    static final String tableName = "fishy_HBase_table";
-
-    InputData inputData;
-    HBaseTupleDescription tupleDescription;
-    Table table;
-    Scan scanDetails;
-    Configuration hbaseConfiguration;
-    Connection hbaseConnection;
-    HBaseAccessor accessor;
-
-    /*
-	 * After each test is done, close the accessor
-	 * if it was created
-	 */
-    @After
-    public void tearDown() throws Exception {
-        if (accessor == null) {
-            return;
-        }
-
-        closeAccessor();
-        accessor = null;
-    }
-
-	/*
-	 * Test construction of HBaseAccessor.
-	 * Actually no need for this as it is tested in all other tests
-	 * constructing HBaseAccessor but it serves as a simple example
-	 * of mocking
-	 *
-	 * HBaseAccessor is created and then HBaseTupleDescriptioncreation
-	 * is verified
-	 */
-    @Test
-    public void construction() throws Exception {
-        prepareConstruction();
-        HBaseAccessor accessor = new HBaseAccessor(inputData);
-        PowerMockito.verifyNew(HBaseTupleDescription.class).withArguments(inputData);
-    }
-
-	/*
-	 * Test Open returns false when table has no regions
-	 *
-	 * Done by returning an empty Map from getRegionLocations
-	 * Verify Scan object doesn't contain any columns / filters
-	 * Verify scan did not start
-	 */
-    @Test
-    @Ignore
-    @SuppressWarnings("unchecked")
-    public void tableHasNoMetadata() throws Exception {
-        prepareConstruction();
-        prepareTableOpen();
-        prepareEmptyScanner();
-
-        when(inputData.getFragmentMetadata()).thenReturn(null);
-
-        accessor = new HBaseAccessor(inputData);
-        try {
-            accessor.openForRead();
-            fail("should throw no metadata exception");
-        } catch (Exception e) {
-            assertEquals("Missing fragment metadata information", e.getMessage());
-        }
-
-        verifyScannerDidNothing();
-    }
-
-    /*
-     * Helper for test setup.
-     * Creates a mock for HBaseTupleDescription and InputData
-     */
-    private void prepareConstruction() throws Exception {
-        inputData = mock(InputData.class);
-        tupleDescription = mock(HBaseTupleDescription.class);
-        PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDescription);
-    }
-
-    /*
-     * Helper for test setup.
-     * Adds a table name and prepares for table creation
-     */
-    private void prepareTableOpen() throws Exception {
-        // Set table name
-        when(inputData.getDataSource()).thenReturn(tableName);
-
-        // Make sure we mock static functions in HBaseConfiguration
-        PowerMockito.mockStatic(HBaseConfiguration.class);
-
-        hbaseConfiguration = mock(Configuration.class);
-        when(HBaseConfiguration.create()).thenReturn(hbaseConfiguration);
-
-        // Make sure we mock static functions in ConnectionFactory
-        PowerMockito.mockStatic(ConnectionFactory.class);
-        hbaseConnection = mock(Connection.class);
-        when(ConnectionFactory.createConnection(hbaseConfiguration)).thenReturn(hbaseConnection);
-        table = mock(Table.class);
-        when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
-    }
-
-    /*
-     * Helper for test setup.
-     * Sets zero columns (not realistic) and no filter
-     */
-    private void prepareEmptyScanner() throws Exception {
-        scanDetails = mock(Scan.class);
-        PowerMockito.whenNew(Scan.class).withNoArguments().thenReturn(scanDetails);
-
-        when(tupleDescription.columns()).thenReturn(0);
-        when(inputData.hasFilter()).thenReturn(false);
-    }
-
-    /*
-     * Verify Scan object was used but didn't do much
-     */
-    private void verifyScannerDidNothing() throws Exception {
-        // setMaxVersions was called with 1
-        verify(scanDetails).setMaxVersions(1);
-        // addColumn was not called
-        verify(scanDetails, never()).addColumn(any(byte[].class), any(byte[].class));
-        // addFilter was not called
-        verify(scanDetails, never()).setFilter(any(org.apache.hadoop.hbase.filter.Filter.class));
-        // Nothing else was missed
-        verifyNoMoreInteractions(scanDetails);
-        // Scanner was not used
-        verify(table, never()).getScanner(scanDetails);
-    }
-
-    /*
-     * Close the accessor and make sure table was closed
-     */
-    private void closeAccessor() throws Exception {
-        accessor.closeForRead();
-        verify(table).close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseResolverTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseResolverTest.java b/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseResolverTest.java
deleted file mode 100644
index eefb1d0..0000000
--- a/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseResolverTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package com.pivotal.pxf.plugins.hbase;
-
-import com.pivotal.pxf.api.BadRecordException;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseTupleDescription;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({HBaseResolver.class})
-public class HBaseResolverTest {
-    InputData inputData;
-    HBaseTupleDescription tupleDesc;
-
-    @Test
-    /*
-	 * Test construction of HBaseResolver.
-	 * 
-	 * HBaseResolver is created and then HBaseTupleDescription 
-	 * creation is verified
-	 */
-    public void construction() throws Exception {
-        inputData = mock(InputData.class);
-        tupleDesc = mock(HBaseTupleDescription.class);
-        PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDesc);
-
-        HBaseResolver resolver = new HBaseResolver(inputData);
-        PowerMockito.verifyNew(HBaseTupleDescription.class).withArguments(inputData);
-    }
-
-    @Test
-	/*
-	 * Test the convertToJavaObject method
-	 */
-    public void testConvertToJavaObject() throws Exception {
-        Object result;
-
-        inputData = mock(InputData.class);
-        tupleDesc = mock(HBaseTupleDescription.class);
-        PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDesc);
-
-        HBaseResolver resolver = new HBaseResolver(inputData);
-
-		/*
-		 * Supported type, No value.
-		 * Should successfully return Null.
-		 */
-        result = resolver.convertToJavaObject(20, "bigint", null);
-        assertNull(result);
-		
-		/*
-		 * Supported type, With value
-		 * Should successfully return a Java Object that holds original value
-		 */
-        result = resolver.convertToJavaObject(20, "bigint", "1234".getBytes());
-        assertEquals(((Long) result).longValue(), 1234L);
-		
-		/*
-		 * Supported type, Invalid value
-		 * Should throw a BadRecordException, with detailed explanation.
-		 */
-        try {
-            result = resolver.convertToJavaObject(20, "bigint", "not_a_numeral".getBytes());
-            fail("Supported type, Invalid value should throw an exception");
-        } catch (BadRecordException e) {
-            assertEquals("Error converting value 'not_a_numeral' to type bigint. (original error: For input string: \"not_a_numeral\")", e.getMessage());
-        } catch (Exception e) {
-            fail("Supported type, Invalid value expected to catch a BadRecordException, caught Exception");
-        }
-		
-		/*
-		 * Unsupported type
-		 * Should throw an Exception, indicating the name of the unsupported type
-		 */
-        try {
-            result = resolver.convertToJavaObject(600, "point", "[1,1]".getBytes());
-            fail("Unsupported data type should throw exception");
-        } catch (Exception e) {
-            assertEquals("Unsupported data type point", e.getMessage());
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessorTest.java b/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessorTest.java
new file mode 100644
index 0000000..c9119c7
--- /dev/null
+++ b/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessorTest.java
@@ -0,0 +1,160 @@
+package org.apache.hawq.pxf.plugins.hbase;
+
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseTupleDescription;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({HBaseAccessor.class, HBaseConfiguration.class, ConnectionFactory.class})
+public class HBaseAccessorTest {
+    static final String tableName = "fishy_HBase_table";
+
+    InputData inputData;
+    HBaseTupleDescription tupleDescription;
+    Table table;
+    Scan scanDetails;
+    Configuration hbaseConfiguration;
+    Connection hbaseConnection;
+    HBaseAccessor accessor;
+
+    /*
+	 * After each test is done, close the accessor
+	 * if it was created
+	 */
+    @After
+    public void tearDown() throws Exception {
+        if (accessor == null) {
+            return;
+        }
+
+        closeAccessor();
+        accessor = null;
+    }
+
+	/*
+	 * Test construction of HBaseAccessor.
+	 * Actually no need for this as it is tested in all other tests
+	 * constructing HBaseAccessor but it serves as a simple example
+	 * of mocking
+	 *
+	 * HBaseAccessor is created and then HBaseTupleDescriptioncreation
+	 * is verified
+	 */
+    @Test
+    public void construction() throws Exception {
+        prepareConstruction();
+        HBaseAccessor accessor = new HBaseAccessor(inputData);
+        PowerMockito.verifyNew(HBaseTupleDescription.class).withArguments(inputData);
+    }
+
+	/*
+	 * Test Open returns false when table has no regions
+	 *
+	 * Done by returning an empty Map from getRegionLocations
+	 * Verify Scan object doesn't contain any columns / filters
+	 * Verify scan did not start
+	 */
+    @Test
+    @Ignore
+    @SuppressWarnings("unchecked")
+    public void tableHasNoMetadata() throws Exception {
+        prepareConstruction();
+        prepareTableOpen();
+        prepareEmptyScanner();
+
+        when(inputData.getFragmentMetadata()).thenReturn(null);
+
+        accessor = new HBaseAccessor(inputData);
+        try {
+            accessor.openForRead();
+            fail("should throw no metadata exception");
+        } catch (Exception e) {
+            assertEquals("Missing fragment metadata information", e.getMessage());
+        }
+
+        verifyScannerDidNothing();
+    }
+
+    /*
+     * Helper for test setup.
+     * Creates a mock for HBaseTupleDescription and InputData
+     */
+    private void prepareConstruction() throws Exception {
+        inputData = mock(InputData.class);
+        tupleDescription = mock(HBaseTupleDescription.class);
+        PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDescription);
+    }
+
+    /*
+     * Helper for test setup.
+     * Adds a table name and prepares for table creation
+     */
+    private void prepareTableOpen() throws Exception {
+        // Set table name
+        when(inputData.getDataSource()).thenReturn(tableName);
+
+        // Make sure we mock static functions in HBaseConfiguration
+        PowerMockito.mockStatic(HBaseConfiguration.class);
+
+        hbaseConfiguration = mock(Configuration.class);
+        when(HBaseConfiguration.create()).thenReturn(hbaseConfiguration);
+
+        // Make sure we mock static functions in ConnectionFactory
+        PowerMockito.mockStatic(ConnectionFactory.class);
+        hbaseConnection = mock(Connection.class);
+        when(ConnectionFactory.createConnection(hbaseConfiguration)).thenReturn(hbaseConnection);
+        table = mock(Table.class);
+        when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
+    }
+
+    /*
+     * Helper for test setup.
+     * Sets zero columns (not realistic) and no filter
+     */
+    private void prepareEmptyScanner() throws Exception {
+        scanDetails = mock(Scan.class);
+        PowerMockito.whenNew(Scan.class).withNoArguments().thenReturn(scanDetails);
+
+        when(tupleDescription.columns()).thenReturn(0);
+        when(inputData.hasFilter()).thenReturn(false);
+    }
+
+    /*
+     * Verify Scan object was used but didn't do much
+     */
+    private void verifyScannerDidNothing() throws Exception {
+        // setMaxVersions was called with 1
+        verify(scanDetails).setMaxVersions(1);
+        // addColumn was not called
+        verify(scanDetails, never()).addColumn(any(byte[].class), any(byte[].class));
+        // addFilter was not called
+        verify(scanDetails, never()).setFilter(any(org.apache.hadoop.hbase.filter.Filter.class));
+        // Nothing else was missed
+        verifyNoMoreInteractions(scanDetails);
+        // Scanner was not used
+        verify(table, never()).getScanner(scanDetails);
+    }
+
+    /*
+     * Close the accessor and make sure table was closed
+     */
+    private void closeAccessor() throws Exception {
+        accessor.closeForRead();
+        verify(table).close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolverTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolverTest.java b/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolverTest.java
new file mode 100644
index 0000000..82890a8
--- /dev/null
+++ b/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolverTest.java
@@ -0,0 +1,89 @@
+package org.apache.hawq.pxf.plugins.hbase;
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseTupleDescription;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({HBaseResolver.class})
+public class HBaseResolverTest {
+    InputData inputData;
+    HBaseTupleDescription tupleDesc;
+
+    @Test
+    /*
+	 * Test construction of HBaseResolver.
+	 * 
+	 * HBaseResolver is created and then HBaseTupleDescription 
+	 * creation is verified
+	 */
+    public void construction() throws Exception {
+        inputData = mock(InputData.class);
+        tupleDesc = mock(HBaseTupleDescription.class);
+        PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDesc);
+
+        HBaseResolver resolver = new HBaseResolver(inputData);
+        PowerMockito.verifyNew(HBaseTupleDescription.class).withArguments(inputData);
+    }
+
+    @Test
+	/*
+	 * Test the convertToJavaObject method
+	 */
+    public void testConvertToJavaObject() throws Exception {
+        Object result;
+
+        inputData = mock(InputData.class);
+        tupleDesc = mock(HBaseTupleDescription.class);
+        PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDesc);
+
+        HBaseResolver resolver = new HBaseResolver(inputData);
+
+		/*
+		 * Supported type, No value.
+		 * Should successfully return Null.
+		 */
+        result = resolver.convertToJavaObject(20, "bigint", null);
+        assertNull(result);
+		
+		/*
+		 * Supported type, With value
+		 * Should successfully return a Java Object that holds original value
+		 */
+        result = resolver.convertToJavaObject(20, "bigint", "1234".getBytes());
+        assertEquals(((Long) result).longValue(), 1234L);
+		
+		/*
+		 * Supported type, Invalid value
+		 * Should throw a BadRecordException, with detailed explanation.
+		 */
+        try {
+            result = resolver.convertToJavaObject(20, "bigint", "not_a_numeral".getBytes());
+            fail("Supported type, Invalid value should throw an exception");
+        } catch (BadRecordException e) {
+            assertEquals("Error converting value 'not_a_numeral' to type bigint. (original error: For input string: \"not_a_numeral\")", e.getMessage());
+        } catch (Exception e) {
+            fail("Supported type, Invalid value expected to catch a BadRecordException, caught Exception");
+        }
+		
+		/*
+		 * Unsupported type
+		 * Should throw an Exception, indicating the name of the unsupported type
+		 */
+        try {
+            result = resolver.convertToJavaObject(600, "point", "[1,1]".getBytes());
+            fail("Unsupported data type should throw exception");
+        } catch (Exception e) {
+            assertEquals("Unsupported data type point", e.getMessage());
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java
deleted file mode 100644
index 1a667f4..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.utilities.InputData;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.*;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.io.IOException;
-
-import static com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities.getAvroSchema;
-
-/**
- * A PXF Accessor for reading Avro File records
- */
-public class AvroFileAccessor extends HdfsSplittableDataAccessor {
-    private AvroWrapper<GenericRecord> avroWrapper = null;
-
-    /**
-     * Constructs a AvroFileAccessor that creates the job configuration and
-     * accesses the avro file to fetch the avro schema
-     *
-     * @param input all input parameters coming from the client
-     * @throws Exception if getting the avro schema fails
-     */
-    public AvroFileAccessor(InputData input) throws Exception {
-        // 1. Call the base class
-        super(input, new AvroInputFormat<GenericRecord>());
-
-        // 2. Accessing the avro file through the "unsplittable" API just to get the schema.
-        //    The splittable API (AvroInputFormat) which is the one we will be using to fetch
-        //    the records, does not support getting the avro schema yet.
-        Schema schema = getAvroSchema(conf, inputData.getDataSource());
-
-        // 3. Pass the schema to the AvroInputFormat
-        AvroJob.setInputSchema(jobConf, schema);
-
-        // 4. The avroWrapper required for the iteration
-        avroWrapper = new AvroWrapper<GenericRecord>();
-    }
-
-    @Override
-    protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
-        return new AvroRecordReader<Object>(jobConf, (FileSplit) split);
-    }
-
-    /**
-     * readNextObject
-     * The AVRO accessor is currently the only specialized accessor that
-     * overrides this method. This happens, because the special
-     * AvroRecordReader.next() semantics (use of the AvroWrapper), so it
-     * cannot use the RecordReader's default implementation in
-     * SplittableFileAccessor
-     */
-    @Override
-    public OneRow readNextObject() throws IOException {
-        /** Resetting datum to null, to avoid stale bytes to be padded from the previous row's datum */
-        avroWrapper.datum(null);
-        if (reader.next(avroWrapper, NullWritable.get())) { // There is one more record in the current split.
-            return new OneRow(null, avroWrapper.datum());
-        } else if (getNextSplit()) { // The current split is exhausted. try to move to the next split.
-            return reader.next(avroWrapper, NullWritable.get())
-                    ? new OneRow(null, avroWrapper.datum())
-                    : null;
-        }
-
-        // if neither condition was met, it means we already read all the records in all the splits, and
-        // in this call record variable was not set, so we return null and thus we are signaling end of
-        // records sequence - in this case avroWrapper.datum() will be null
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java
deleted file mode 100644
index 5249877..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java
+++ /dev/null
@@ -1,389 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.ReadResolver;
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import com.pivotal.pxf.plugins.hdfs.utilities.RecordkeyAdapter;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static com.pivotal.pxf.api.io.DataType.*;
-import static com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.SCHEMA_NOT_INDICATED;
-import static com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.SCHEMA_NOT_ON_CLASSPATH;
-import static com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities.getAvroSchema;
-
-/**
- * Class AvroResolver handles deserialization of records that were serialized
- * using the AVRO serialization framework.
- */
-public class AvroResolver extends Plugin implements ReadResolver {
-    private GenericRecord avroRecord = null;
-    private DatumReader<GenericRecord> reader = null;
-    // member kept to enable reuse, and thus avoid repeated allocation
-    private BinaryDecoder decoder = null;
-    private List<Schema.Field> fields = null;
-    private RecordkeyAdapter recordkeyAdapter = new RecordkeyAdapter();
-    private static final String MAPKEY_DELIM = ":";
-    private static final String RECORDKEY_DELIM = ":";
-    private static final String COLLECTION_DELIM = ",";
-    private String collectionDelim;
-    private String mapkeyDelim;
-    private String recordkeyDelim;
-
-    /**
-     * Constructs an AvroResolver. Initializes Avro data structure: the Avro
-     * record - fields information and the Avro record reader. All Avro data is
-     * build from the Avro schema, which is based on the *.avsc file that was
-     * passed by the user
-     *
-     * @param input all input parameters coming from the client
-     * @throws IOException if Avro schema could not be retrieved or parsed
-     */
-    public AvroResolver(InputData input) throws IOException {
-        super(input);
-
-        Schema schema = isAvroFile() ? getAvroSchema(new Configuration(),
-                input.getDataSource())
-                : (new Schema.Parser()).parse(openExternalSchema());
-
-        reader = new GenericDatumReader<>(schema);
-        fields = schema.getFields();
-
-        collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
-                : input.getUserProperty("COLLECTION_DELIM");
-        mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
-                : input.getUserProperty("MAPKEY_DELIM");
-        recordkeyDelim = input.getUserProperty("RECORDKEY_DELIM") == null ? RECORDKEY_DELIM
-                : input.getUserProperty("RECORDKEY_DELIM");
-    }
-
-    /**
-     * Returns a list of the fields of one record. Each record field is
-     * represented by a OneField item. OneField item contains two fields: an
-     * integer representing the field type and a Java Object representing the
-     * field value.
-     */
-    @Override
-    public List<OneField> getFields(OneRow row) throws Exception {
-        avroRecord = makeAvroRecord(row.getData(), avroRecord);
-        List<OneField> record = new LinkedList<OneField>();
-
-        int recordkeyIndex = (inputData.getRecordkeyColumn() == null) ? -1
-                : inputData.getRecordkeyColumn().columnIndex();
-        int currentIndex = 0;
-
-        for (Schema.Field field : fields) {
-            /*
-             * Add the record key if exists
-             */
-            if (currentIndex == recordkeyIndex) {
-                currentIndex += recordkeyAdapter.appendRecordkeyField(record,
-                        inputData, row);
-            }
-
-            currentIndex += populateRecord(record,
-                    avroRecord.get(field.name()), field.schema());
-        }
-
-        return record;
-    }
-
-    /**
-     * Tests if the Avro records are residing inside an AVRO file. If the Avro
-     * records are not residing inside an AVRO file, then they may reside inside
-     * a sequence file, regular file, ...
-     *
-     * @return whether the resource is an Avro file
-     */
-    boolean isAvroFile() {
-        return inputData.getAccessor().toLowerCase().contains("avro");
-    }
-
-    /**
-     * The record can arrive from one out of two different sources: a sequence
-     * file or an AVRO file. If it comes from an AVRO file, then it was already
-     * obtained as a {@link GenericRecord} when when it was fetched from the
-     * file with the {@link AvroRecorReader} so in this case a cast is enough.
-     * On the other hand, if the source is a sequence file, then the input
-     * parameter obj hides a bytes [] buffer which is in fact one Avro record
-     * serialized. Here, we build the Avro record from the flat buffer, using
-     * the AVRO API. Then (for both cases) in the remaining functions we build a
-     * {@code List<OneField>} record from the Avro record.
-     *
-     * @param obj object holding an Avro record
-     * @param reuseRecord Avro record to be reused to create new record from obj
-     * @return Avro record
-     * @throws IOException if creating the Avro record from byte array failed
-     */
-    GenericRecord makeAvroRecord(Object obj, GenericRecord reuseRecord)
-            throws IOException {
-        if (isAvroFile()) {
-            return (GenericRecord) obj;
-        } else {
-            byte[] bytes = ((BytesWritable) obj).getBytes();
-            decoder = DecoderFactory.get().binaryDecoder(bytes, decoder);
-            return reader.read(reuseRecord, decoder);
-        }
-    }
-
-    /**
-     * For a given field in the Avro record we extract its value and insert it
-     * into the output {@code List<OneField>} record. An Avro field can be a
-     * primitive type or an array type.
-     *
-     * @param record list of fields to be populated
-     * @param fieldValue field value
-     * @param fieldSchema field schema
-     * @return the number of populated fields
-     */
-    int populateRecord(List<OneField> record, Object fieldValue,
-                       Schema fieldSchema) {
-
-        Schema.Type fieldType = fieldSchema.getType();
-        int ret = 0;
-        Object value = fieldValue;
-
-        switch (fieldType) {
-            case ARRAY:
-                if(fieldValue == null) {
-                    return addOneFieldToRecord(record, TEXT, fieldValue);
-                }
-                List<OneField> listRecord = new LinkedList<>();
-                ret = setArrayField(listRecord, fieldValue, fieldSchema);
-                addOneFieldToRecord(record, TEXT, String.format("[%s]",
-                        HdfsUtilities.toString(listRecord, collectionDelim)));
-                break;
-            case MAP:
-                if(fieldValue == null) {
-                    return addOneFieldToRecord(record, TEXT, fieldValue);
-                }
-                List<OneField> mapRecord = new LinkedList<>();
-                ret = setMapField(mapRecord, fieldValue, fieldSchema);
-                addOneFieldToRecord(record, TEXT, String.format("{%s}",
-                        HdfsUtilities.toString(mapRecord, collectionDelim)));
-                break;
-            case RECORD:
-                if(fieldValue == null) {
-                    return addOneFieldToRecord(record, TEXT, fieldValue);
-                }
-                List<OneField> recRecord = new LinkedList<>();
-                ret = setRecordField(recRecord, fieldValue, fieldSchema);
-                addOneFieldToRecord(record, TEXT, String.format("{%s}",
-                        HdfsUtilities.toString(recRecord, collectionDelim)));
-                break;
-            case UNION:
-                /*
-                 * When an Avro field is actually a union, we resolve the type
-                 * of the union element, and delegate the record update via
-                 * recursion
-                 */
-                int unionIndex = GenericData.get().resolveUnion(fieldSchema,
-                        fieldValue);
-                /**
-                 * Retrieve index of the non null data type from the type array
-                 * if value is null
-                 */
-                if (fieldValue == null) {
-                    unionIndex ^= 1;
-                }
-                ret = populateRecord(record, fieldValue,
-                        fieldSchema.getTypes().get(unionIndex));
-                break;
-            case ENUM:
-                ret = addOneFieldToRecord(record, TEXT, value);
-                break;
-            case INT:
-                ret = addOneFieldToRecord(record, INTEGER, value);
-                break;
-            case DOUBLE:
-                ret = addOneFieldToRecord(record, FLOAT8, value);
-                break;
-            case STRING:
-                value = (fieldValue != null) ? String.format("%s", fieldValue)
-                        : null;
-                ret = addOneFieldToRecord(record, TEXT, value);
-                break;
-            case FLOAT:
-                ret = addOneFieldToRecord(record, REAL, value);
-                break;
-            case LONG:
-                ret = addOneFieldToRecord(record, BIGINT, value);
-                break;
-            case BYTES:
-                ret = addOneFieldToRecord(record, BYTEA, value);
-                break;
-            case BOOLEAN:
-                ret = addOneFieldToRecord(record, BOOLEAN, value);
-                break;
-            case FIXED:
-                ret = addOneFieldToRecord(record, BYTEA, value);
-                break;
-            default:
-                break;
-        }
-        return ret;
-    }
-
-    /**
-     * When an Avro field is actually a record, we iterate through each field
-     * for each entry, the field name and value are added to a local record
-     * {@code List<OneField>} complexRecord with the necessary delimiter we
-     * create an object of type OneField and insert it into the output
-     * {@code List<OneField>} record.
-     *
-     * @param record list of fields to be populated
-     * @param value field value
-     * @param recSchema record schema
-     * @return number of populated fields
-     */
-    int setRecordField(List<OneField> record, Object value, Schema recSchema) {
-
-        GenericRecord rec = ((GenericData.Record) value);
-        Schema fieldKeySchema = Schema.create(Schema.Type.STRING);
-        int currentIndex = 0;
-        for (Schema.Field field : recSchema.getFields()) {
-            Schema fieldSchema = field.schema();
-            Object fieldValue = rec.get(field.name());
-            List<OneField> complexRecord = new LinkedList<>();
-            populateRecord(complexRecord, field.name(), fieldKeySchema);
-            populateRecord(complexRecord, fieldValue, fieldSchema);
-            addOneFieldToRecord(record, TEXT,
-                    HdfsUtilities.toString(complexRecord, recordkeyDelim));
-            currentIndex++;
-        }
-        return currentIndex;
-    }
-
-    /**
-     * When an Avro field is actually a map, we resolve the type of the map
-     * value For each entry, the field name and value are added to a local
-     * record we create an object of type OneField and insert it into the output
-     * {@code List<OneField>} record.
-     *
-     * Unchecked warning is suppressed to enable us to cast fieldValue to a Map.
-     * (since the value schema has been identified to me of type map)
-     *
-     * @param record list of fields to be populated
-     * @param fieldValue field value
-     * @param mapSchema map schema
-     * @return number of populated fields
-     */
-    @SuppressWarnings("unchecked")
-    int setMapField(List<OneField> record, Object fieldValue, Schema mapSchema) {
-        Schema keySchema = Schema.create(Schema.Type.STRING);
-        Schema valueSchema = mapSchema.getValueType();
-        Map<String, ?> avroMap = ((Map<String, ?>) fieldValue);
-        for (Map.Entry<String, ?> entry : avroMap.entrySet()) {
-            List<OneField> complexRecord = new LinkedList<>();
-            populateRecord(complexRecord, entry.getKey(), keySchema);
-            populateRecord(complexRecord, entry.getValue(), valueSchema);
-            addOneFieldToRecord(record, TEXT,
-                    HdfsUtilities.toString(complexRecord, mapkeyDelim));
-        }
-        return avroMap.size();
-    }
-
-    /**
-     * When an Avro field is actually an array, we resolve the type of the array
-     * element, and for each element in the Avro array, we recursively invoke
-     * the population of {@code List<OneField>} record.
-     *
-     * @param record list of fields to be populated
-     * @param fieldValue field value
-     * @param arraySchema array schema
-     * @return number of populated fields
-     */
-    int setArrayField(List<OneField> record, Object fieldValue,
-                      Schema arraySchema) {
-        Schema typeSchema = arraySchema.getElementType();
-        GenericData.Array<?> array = (GenericData.Array<?>) fieldValue;
-        int length = array.size();
-        for (int i = 0; i < length; i++) {
-            populateRecord(record, array.get(i), typeSchema);
-        }
-        return length;
-    }
-
-    /**
-     * Creates the {@link OneField} object and adds it to the output {@code List<OneField>}
-     * record. Strings and byte arrays are held inside special types in the Avro
-     * record so we transfer them to standard types in order to enable their
-     * insertion in the GPDBWritable instance.
-     *
-     * @param record list of fields to be populated
-     * @param gpdbWritableType field type
-     * @param val field value
-     * @return 1 (number of populated fields)
-     */
-    int addOneFieldToRecord(List<OneField> record, DataType gpdbWritableType,
-                            Object val) {
-        OneField oneField = new OneField();
-        oneField.type = gpdbWritableType.getOID();
-        switch (gpdbWritableType) {
-            case BYTEA:
-                if (val instanceof ByteBuffer) {
-                    oneField.val = ((ByteBuffer) val).array();
-                } else {
-                    /**
-                     * Entry point when the underlying bytearray is from a Fixed
-                     * data
-                     */
-                    oneField.val = ((GenericData.Fixed) val).bytes();
-                }
-                break;
-            default:
-                oneField.val = val;
-                break;
-        }
-
-        record.add(oneField);
-        return 1;
-    }
-
-    /**
-     * Opens Avro schema based on DATA-SCHEMA parameter.
-     *
-     * @return InputStream of schema file
-     * @throws DataSchemaException if schema file could not be opened
-     */
-    InputStream openExternalSchema() {
-
-        String schemaName = inputData.getUserProperty("DATA-SCHEMA");
-
-        /**
-         * Testing that the schema name was supplied by the user - schema is an
-         * optional properly.
-         */
-        if (schemaName == null) {
-            throw new DataSchemaException(SCHEMA_NOT_INDICATED,
-                    this.getClass().getName());
-        }
-
-        /** Testing that the schema resource exists. */
-        if (this.getClass().getClassLoader().getResource(schemaName) == null) {
-            throw new DataSchemaException(SCHEMA_NOT_ON_CLASSPATH, schemaName);
-        }
-        ClassLoader loader = this.getClass().getClassLoader();
-        return loader.getResourceAsStream(schemaName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java
deleted file mode 100644
index 1873d7d..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java
+++ /dev/null
@@ -1,176 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import com.pivotal.pxf.plugins.hdfs.ChunkWritable;
-
-/**
- * A class that provides a line reader from an input stream. Lines are
- * terminated by '\n' (LF) EOF also terminates an otherwise unterminated line.
- */
-public class ChunkReader implements Closeable {
-    public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-    private int bufferSize = DEFAULT_BUFFER_SIZE;
-    private InputStream in;
-    private byte[] buffer;
-    // the number of bytes of real data in the buffer
-    private int bufferLength = 0;
-    // the current position in the buffer
-    private int bufferPosn = 0;
-    private static final byte LF = '\n';
-
-    /**
-     * Constructs a ChunkReader instance
-     *
-     * @param in input stream
-     */
-    public ChunkReader(InputStream in) {
-        this.in = in;
-        this.buffer = new byte[this.bufferSize];
-    }
-
-    /**
-     * Closes the underlying stream.
-     */
-    @Override
-    public void close() throws IOException {
-        in.close();
-    }
-
-    /*
-     * Internal class used for holding part of a chunk brought by one read()
-     * operation on the input stream. We collect several such nodes in a list by
-     * doing several read operation until we reach the chunk size -
-     * maxBytesToConsume
-     */
-    private class Node {
-        /* part of a chunk brought in a single inputstream.read() operation */
-        public byte[] slice;
-        /* the size of the slice */
-        public int len;
-    }
-
-    /**
-     * Reads data in chunks of DEFAULT_CHUNK_SIZE, until we reach
-     * maxBytesToConsume.
-     *
-     * @param str - output parameter, will contain the read chunk byte array
-     * @param maxBytesToConsume - requested chunk size
-     * @return actual chunk size
-     * @throws IOException if the first byte cannot be read for any reason
-     *         other than the end of the file, if the input stream has been closed,
-     *         or if some other I/O error occurs.
-     */
-    public int readChunk(Writable str, int maxBytesToConsume) throws IOException
-           {
-        ChunkWritable cw = (ChunkWritable) str;
-        List<Node> list = new LinkedList<Node>();
-
-        long bytesConsumed = 0;
-
-        do {
-            if (bufferLength > 0) {
-                int remaining = bufferLength - bufferPosn;
-                Node nd = new Node();
-                nd.slice = new byte[remaining];
-                nd.len = remaining;
-                System.arraycopy(buffer, bufferPosn, nd.slice, 0, nd.len);
-                list.add(nd);
-                bytesConsumed += nd.len;
-            } else {
-                Node nd = new Node();
-                nd.slice = new byte[buffer.length];
-                nd.len = in.read(nd.slice);
-                if (nd.len <= 0) {
-                    break; // EOF
-                }
-                bytesConsumed += nd.len;
-                list.add(nd);
-            }
-
-            bufferLength = bufferPosn = 0;
-
-        } while (bytesConsumed < maxBytesToConsume);
-
-        if (list.size() > 0) {
-            cw.box = new byte[(int) bytesConsumed];
-            int pos = 0;
-            for (int i = 0; i < list.size(); i++) {
-                Node n = list.get(i);
-                System.arraycopy(n.slice, 0, cw.box, pos, n.len);
-                pos += n.len;
-            }
-        }
-
-        return (int) bytesConsumed;
-    }
-
-    /**
-     * Reads a line terminated by LF.
-     *
-     * @param str - output parameter, will contain the read record
-     * @param maxBytesToConsume - the line mustn't exceed this value
-     * @return length of the line read
-     * @throws IOException if the first byte cannot be read for any reason
-     *         other than the end of the file, if the input stream has been closed,
-     *         or if some other I/O error occurs.
-     */
-    public int readLine(Writable str, int maxBytesToConsume) throws IOException {
-        ChunkWritable cw = (ChunkWritable) str;
-        List<Node> list = new LinkedList<Node>();
-
-        boolean newLine = false; // length of terminating newline
-        long bytesConsumed = 0;
-
-        do {
-            int startPosn = bufferPosn; // starting from where we left off the
-                                        // last time
-            if (bufferPosn >= bufferLength) {
-                startPosn = bufferPosn = 0;
-
-                bufferLength = in.read(buffer);
-                if (bufferLength <= 0) {
-                    break; // EOF
-                }
-            }
-
-            for (; bufferPosn < bufferLength; ++bufferPosn) { // search for
-                                                              // newline
-                if (buffer[bufferPosn] == LF) {
-                    newLine = true;
-                    ++bufferPosn; // at next invocation proceed from following
-                                  // byte
-                    break;
-                }
-            }
-
-            int readLength = bufferPosn - startPosn;
-            bytesConsumed += readLength;
-
-            if (readLength > 0) {
-                Node nd = new Node();
-                nd.slice = new byte[readLength];
-                nd.len = readLength;
-                System.arraycopy(buffer, startPosn, nd.slice, 0, nd.len);
-                list.add(nd);
-            }
-        } while (!newLine && bytesConsumed < maxBytesToConsume);
-
-        if (list.size() > 0) {
-            cw.box = new byte[(int) bytesConsumed];
-            int pos = 0;
-            for (int i = 0; i < list.size(); i++) {
-                Node n = list.get(i);
-                System.arraycopy(n.slice, 0, cw.box, pos, n.len);
-                pos += n.len;
-            }
-        }
-
-        return (int) bytesConsumed;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
deleted file mode 100644
index 13a3546..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
+++ /dev/null
@@ -1,281 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
-import static org.apache.hadoop.mapreduce.lib.input.LineRecordReader.MAX_LINE_LENGTH;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.hdfs.DFSInputStream;
-import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-
-/**
- * ChunkRecordReader is designed for fast reading of a file split. The idea is
- * to bring chunks of data instead of single records. The chunks contain many
- * records and the chunk end is not aligned on a record boundary. The size of
- * the chunk is a class hardcoded parameter - CHUNK_SIZE. This behaviour sets
- * this reader apart from the other readers which will fetch one record and stop
- * when reaching a record delimiter.
- */
-public class ChunkRecordReader implements
-        RecordReader<LongWritable, ChunkWritable> {
-    private static final Log LOG = LogFactory.getLog(ChunkRecordReader.class.getName());
-
-    private CompressionCodecFactory compressionCodecs = null;
-    private long start;
-    private long pos;
-    private long end;
-    private long fileLength;
-    private ChunkReader in;
-    private FSDataInputStream fileIn;
-    private final Seekable filePosition;
-    private int maxLineLength;
-    private CompressionCodec codec;
-    private Decompressor decompressor;
-    private static final int CHUNK_SIZE = 1024 * 1024;
-
-    /**
-     * Translates the FSDataInputStream into a DFSInputStream.
-     */
-    private DFSInputStream getInputStream() {
-        return (DFSInputStream) (fileIn.getWrappedStream());
-    }
-
-    /**
-     * Returns statistics of the input stream's read operation: total bytes
-     * read, bytes read locally, bytes read in short-circuit (directly from file
-     * descriptor).
-     *
-     * @return an instance of ReadStatistics class
-     */
-    public ReadStatistics getReadStatistics() {
-        return getInputStream().getReadStatistics();
-    }
-
-    /**
-     * Constructs a ChunkRecordReader instance.
-     *
-     * @param job the job configuration
-     * @param split contains the file name, begin byte of the split and the
-     *            bytes length
-     * @throws IOException if an I/O error occurs when accessing the file or
-     *             creating input stream to read from it
-     */
-    public ChunkRecordReader(Configuration job, FileSplit split)
-            throws IOException {
-        maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
-        validateLength(maxLineLength);
-        start = split.getStart();
-        end = start + split.getLength();
-        final Path file = split.getPath();
-        compressionCodecs = new CompressionCodecFactory(job);
-        codec = compressionCodecs.getCodec(file);
-
-        // open the file and seek to the start of the split
-        job.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
-        final FileSystem fs = file.getFileSystem(job);
-        fs.setVerifyChecksum(false);
-        fileIn = fs.open(file, ChunkReader.DEFAULT_BUFFER_SIZE);
-        fileLength = getInputStream().getFileLength();
-        if (isCompressedInput()) {
-            decompressor = CodecPool.getDecompressor(codec);
-            if (codec instanceof SplittableCompressionCodec) {
-                final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
-                        fileIn, decompressor, start, end,
-                        SplittableCompressionCodec.READ_MODE.BYBLOCK);
-                in = new ChunkReader(cIn);
-                start = cIn.getAdjustedStart();
-                end = cIn.getAdjustedEnd();
-                filePosition = cIn; // take pos from compressed stream
-            } else {
-                in = new ChunkReader(codec.createInputStream(fileIn,
-                        decompressor));
-                filePosition = fileIn;
-            }
-        } else {
-            fileIn.seek(start);
-            in = new ChunkReader(fileIn);
-            filePosition = fileIn;
-        }
-        /*
-         * If this is not the first split, we always throw away first record
-         * because we always (except the last split) read one extra line in
-         * next() method.
-         */
-        if (start != 0) {
-            start += in.readLine(new ChunkWritable(), maxBytesToConsume(start));
-        }
-        this.pos = start;
-    }
-
-    /**
-     * Used by the client of this class to create the 'key' output parameter for
-     * next() method.
-     *
-     * @return an instance of LongWritable
-     */
-    @Override
-    public LongWritable createKey() {
-        return new LongWritable();
-    }
-
-    /**
-     * Used by the client of this class to create the 'value' output parameter
-     * for next() method.
-     *
-     * @return an instance of ChunkWritable
-     */
-    @Override
-    public ChunkWritable createValue() {
-        return new ChunkWritable();
-    }
-
-    /**
-     * Fetches the next data chunk from the file split. The size of the chunk is
-     * a class hardcoded parameter - CHUNK_SIZE. This behaviour sets this reader
-     * apart from the other readers which will fetch one record and stop when
-     * reaching a record delimiter.
-     *
-     * @param key - output parameter. When method returns will contain the key -
-     *            the number of the start byte of the chunk
-     * @param value - output parameter. When method returns will contain the
-     *            value - the chunk, a byte array inside the ChunkWritable
-     *            instance
-     * @return false - when end of split was reached
-     * @throws IOException if an I/O error occurred while reading the next chunk
-     *             or line
-     */
-    @Override
-    public synchronized boolean next(LongWritable key, ChunkWritable value)
-            throws IOException {
-        /*
-         * Usually a record is spread between the end of current split and the
-         * beginning of next split. So when reading the last record in the split
-         * we usually need to cross over to the next split. This tricky logic is
-         * implemented in ChunkReader.readLine(). In order not to rewrite this
-         * logic we will read the lust chunk in the split with readLine(). For a
-         * split of 120M, reading the last 1M line by line doesn't have a huge
-         * impact. Applying a factor to the last chunk to make sure we start
-         * before the last record.
-         */
-        float factor = 1.5f;
-        int limit = (int) (factor * CHUNK_SIZE);
-        long curPos = getFilePosition();
-        int newSize = 0;
-
-        while (curPos <= end) {
-            key.set(pos);
-
-            if ((end - curPos) > limit) {
-                newSize = in.readChunk(value, CHUNK_SIZE);
-            } else {
-                newSize = in.readLine(value,
-                        Math.max(maxBytesToConsume(pos), maxLineLength));
-            }
-            if (newSize == 0) {
-                break;
-            }
-
-            pos += newSize;
-
-            if (pos == fileLength) { /*
-                                      * in case text file last character is not
-                                      * a linefeed
-                                      */
-                if (value.box[value.box.length - 1] != '\n') {
-                    int newLen = value.box.length + 1;
-                    byte[] tmp = new byte[newLen];
-                    System.arraycopy(value.box, 0, tmp, 0, newLen - 1);
-                    tmp[newLen - 1] = '\n';
-                    value.box = tmp;
-                }
-            }
-
-            return true;
-        }
-        /*
-         * if we got here, either newSize was 0 or curPos is bigger than end
-         */
-
-        return false;
-    }
-
-    /**
-     * Gets the progress within the split.
-     */
-    @Override
-    public synchronized float getProgress() throws IOException {
-        if (start == end) {
-            return 0.0f;
-        } else {
-            return Math.min(1.0f, (getFilePosition() - start)
-                    / (float) (end - start));
-        }
-    }
-
-    /**
-     * Returns the position of the unread tail of the file
-     *
-     * @return pos - start byte of the unread tail of the file
-     */
-    @Override
-    public synchronized long getPos() throws IOException {
-        return pos;
-    }
-
-    /**
-     * Closes the input stream.
-     */
-    @Override
-    public synchronized void close() throws IOException {
-        try {
-            if (in != null) {
-                in.close();
-            }
-        } finally {
-            if (decompressor != null) {
-                CodecPool.returnDecompressor(decompressor);
-            }
-        }
-    }
-
-    private void validateLength(int maxLineLength) {
-        if (maxLineLength <= 0)
-            throw new IllegalArgumentException(
-                    "maxLineLength must be a positive value");
-    }
-
-    private boolean isCompressedInput() {
-        return (codec != null);
-    }
-
-    private int maxBytesToConsume(long pos) {
-        return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(
-                Integer.MAX_VALUE, end - pos);
-    }
-
-    private long getFilePosition() throws IOException {
-        long retVal;
-        if (isCompressedInput() && null != filePosition) {
-            retVal = filePosition.getPos();
-        } else {
-            retVal = pos;
-        }
-        return retVal;
-    }
-} // class ChunkRecordReader

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
deleted file mode 100644
index a0a8b17..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import java.io.DataOutput;
-import java.io.DataInput;
-import java.lang.UnsupportedOperationException;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Just an output buffer for the ChunkRecordReader. It must extend Writable
- * otherwise it will not fit into the next() interface method
- */
-public class ChunkWritable implements Writable {
-	public byte [] box;
-
-	/**
-     * Serializes the fields of this object to <code>out</code>.
-     *
-     * @param out <code>DataOutput</code> to serialize this object into.
-     * @throws UnsupportedOperationException this function is not supported
-     */
-	@Override
-    public void write(DataOutput out)  {
-		throw new UnsupportedOperationException("ChunkWritable.write() is not implemented");
-    }
-
-    /**
-     * Deserializes the fields of this object from <code>in</code>.
-     * <p>For efficiency, implementations should attempt to re-use storage in the
-     * existing object where possible.</p>
-     *
-     * @param in <code>DataInput</code> to deserialize this object from.
-     * @throws UnsupportedOperationException  this function is not supported
-     */
-	@Override
-    public void readFields(DataInput in)  {
-		throw new UnsupportedOperationException("ChunkWritable.readFields() is not implemented");
-	}
-}
\ No newline at end of file