You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2015/10/28 23:10:02 UTC
[12/15] incubator-hawq git commit: HAWQ-45. PXF Package Namespace
change
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolver.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolver.java
new file mode 100644
index 0000000..3ce0d1b
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolver.java
@@ -0,0 +1,148 @@
+package org.apache.hawq.pxf.plugins.hbase;
+
+import org.apache.hawq.pxf.api.*;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseColumnDescriptor;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseTupleDescription;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.sql.Timestamp;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Record resolver for HBase.
+ *
+ * The class is responsible to convert rows from HBase scans (returned as {@link Result} objects)
+ * into a List of {@link OneField} objects.
+ * That also includes the conversion process of each HBase column's value into its HAWQ assigned type.
+ *
+ * Currently, the class assumes all HBase values are stored as String object Bytes encoded.
+ */
+public class HBaseResolver extends Plugin implements ReadResolver {
+ private HBaseTupleDescription tupleDescription;
+
+ /**
+ * Constructs a resolver and initializes the table's tuple description.
+ *
+ * @param input query information, contains HBase table name and filter
+ */
+ public HBaseResolver(InputData input) {
+ super(input);
+ tupleDescription = new HBaseTupleDescription(input);
+ }
+
+ /**
+ * Splits an HBase {@link Result} object into a list of {@link OneField},
+ * based on the table's tuple description.
+ * Each field is converted from HBase bytes into its column description type.
+ *
+ * @return list of fields
+ */
+ @Override
+ public List<OneField> getFields(OneRow onerow) throws Exception {
+ Result result = (Result) onerow.getData();
+ LinkedList<OneField> fields = new LinkedList<OneField>();
+
+ for (int i = 0; i < tupleDescription.columns(); ++i) {
+ HBaseColumnDescriptor column = tupleDescription.getColumn(i);
+ byte[] value;
+
+ if (column.isKeyColumn()) // if a row column is requested
+ {
+ value = result.getRow(); // just return the row key
+ } else // else, return column value
+ {
+ value = getColumnValue(result, column);
+ }
+
+ OneField oneField = new OneField();
+ oneField.type = column.columnTypeCode();
+ oneField.val = convertToJavaObject(oneField.type, column.columnTypeName(), value);
+ fields.add(oneField);
+ }
+ return fields;
+ }
+
+ /**
+ * Converts given byte array value to the matching java object, according to
+ * the given type code.
+ *
+ * @param typeCode ColumnDescriptor type id
+ * @param typeName type name. Used for error messages
+ * @param val value to be converted
+ * @return value converted to matching object type
+ * @throws Exception when conversion fails or type code is not supported
+ */
+ Object convertToJavaObject(int typeCode, String typeName, byte[] val) throws Exception {
+ if (val == null) {
+ return null;
+ }
+ try {
+ switch (DataType.get(typeCode)) {
+ case TEXT:
+ case VARCHAR:
+ case BPCHAR:
+ return Bytes.toString(val);
+
+ case INTEGER:
+ return Integer.parseInt(Bytes.toString(val));
+
+ case BIGINT:
+ return Long.parseLong(Bytes.toString(val));
+
+ case SMALLINT:
+ return Short.parseShort(Bytes.toString(val));
+
+ case REAL:
+ return Float.parseFloat(Bytes.toString(val));
+
+ case FLOAT8:
+ return Double.parseDouble(Bytes.toString(val));
+
+ case BYTEA:
+ return val;
+
+ case BOOLEAN:
+ return Boolean.valueOf(Bytes.toString(val));
+
+ case NUMERIC:
+ return Bytes.toString(val);
+
+ case TIMESTAMP:
+ return Timestamp.valueOf(Bytes.toString(val));
+
+ default:
+ throw new UnsupportedTypeException("Unsupported data type " + typeName);
+ }
+ } catch (NumberFormatException e) {
+ throw new BadRecordException("Error converting value '" + Bytes.toString(val) + "' " +
+ "to type " + typeName + ". " +
+ "(original error: " + e.getMessage() + ")");
+ }
+ }
+
+ /**
+ * Returns the value of a column from a Result object.
+ *
+ * @param result HBase table row
+ * @param column HBase column to be retrieved
+ * @return HBase column value
+ */
+ byte[] getColumnValue(Result result, HBaseColumnDescriptor column) {
+ // if column does not contain a value, return null
+ if (!result.containsColumn(column.columnFamilyBytes(),
+ column.qualifierBytes())) {
+ return null;
+ }
+
+ // else, get the latest version of the requested column
+ Cell cell = result.getColumnLatestCell(column.columnFamilyBytes(), column.qualifierBytes());
+ return CellUtil.cloneValue(cell);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
new file mode 100644
index 0000000..177cfa2
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseColumnDescriptor.java
@@ -0,0 +1,82 @@
+package org.apache.hawq.pxf.plugins.hbase.utilities;
+
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Arrays;
+
+/**
+ * {@link ColumnDescriptor} for HBase columns.
+ */
+public class HBaseColumnDescriptor extends ColumnDescriptor {
+ byte[] columnFamily;
+ byte[] qualifier;
+
+ /**
+ * Constructs a column descriptor using the given copy's column name.
+ *
+ * @param copy column descriptor to be copied
+ */
+ public HBaseColumnDescriptor(ColumnDescriptor copy) {
+ this(copy, copy.columnName().getBytes());
+ }
+
+ /**
+ * Constructs an HBase column descriptor from a generic column descriptor and an HBase column name.
+ * <p>
+ * The column name must be in either of the following forms:
+ * <ol>
+ * <li>columnfamily:qualifier - standard HBase column.</li>
+ * <li>recordkey - Row key column (case insensitive).</li>
+ * </ol>
+ * <p>
+ * For recordkey, no HBase name is created.
+ *
+ * @param copy column descriptor
+ * @param newColumnName HBase column name - can be different than the given column descriptor name.
+ */
+ public HBaseColumnDescriptor(ColumnDescriptor copy, byte[] newColumnName) {
+ super(copy);
+
+ if (isKeyColumn()) {
+ return;
+ }
+
+ int seperatorIndex = getSeparatorIndex(newColumnName);
+
+ columnFamily = Arrays.copyOfRange(newColumnName, 0, seperatorIndex);
+ qualifier = Arrays.copyOfRange(newColumnName, seperatorIndex + 1, newColumnName.length);
+ }
+
+ /**
+ * Returns the family column name.
+ * (E.g. "cf1:q2" will return "cf1")
+ *
+ * @return family column name
+ */
+ public byte[] columnFamilyBytes() {
+ return columnFamily;
+ }
+
+ /**
+ * Returns the qualifier column name.
+ * (E.g. "cf1:q2" will return "q2")
+ *
+ * @return qualifier column name
+ */
+ public byte[] qualifierBytes() {
+ return qualifier;
+ }
+
+ private int getSeparatorIndex(byte[] columnName) {
+ for (int i = 0; i < columnName.length; ++i) {
+ if (columnName[i] == ':') {
+ return i;
+ }
+ }
+
+ throw new IllegalArgumentException("Illegal HBase column name " +
+ Bytes.toString(columnName) +
+ ", missing :");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
new file mode 100644
index 0000000..2da92db
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseIntegerComparator.java
@@ -0,0 +1,92 @@
+package org.apache.hawq.pxf.plugins.hbase.utilities;
+
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.SubstringComparator;
+import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * This is a Filter comparator for HBase It is external to PXF HBase code.
+ * <p>
+ * To use with HBase it must reside in the classpath of every region server.
+ * <p>
+ * It converts a value into {@link Long} before comparing.
+ * The filter is good for any integer numeric comparison i.e. integer, bigint, smallint.
+ * <p>
+ * according to HBase 0.96 requirements, this must serialized using Protocol Buffers
+ * ({@link #toByteArray()} and {@link #parseFrom(byte[])} methods).
+ * <p>
+ * A reference can be found in {@link SubstringComparator}.
+ */
+public class HBaseIntegerComparator extends ByteArrayComparable {
+ private Long val;
+
+
+ public HBaseIntegerComparator(Long inVal) {
+ super(Bytes.toBytes(inVal));
+ this.val = inVal;
+ }
+
+ /**
+ * The comparison function. Currently uses {@link Long#parseLong(String)}.
+ *
+ * @return 0 if equal;
+ * a value less than 0 if row value is less than filter value;
+ * and a value greater than 0 if the row value is greater than the filter value.
+ */
+ @Override
+ public int compareTo(byte[] value, int offset, int length) {
+ /**
+ * Fix for HD-2610: query fails when recordkey is integer.
+ */
+ if (length == 0)
+ return 1; // empty line, can't compare.
+
+ /**
+ * TODO optimize by parsing the bytes directly.
+ * Maybe we can even determine if it is an int or a string encoded.
+ */
+ String valueAsString = new String(value, offset, length);
+ Long valueAsLong = Long.parseLong(valueAsString);
+ return val.compareTo(valueAsLong);
+ }
+
+ /**
+ * Returns the comparator serialized using Protocol Buffers.
+ *
+ * @return serialized comparator
+ */
+ @Override
+ public byte[] toByteArray() {
+ ComparatorProtos.ByteArrayComparable.Builder builder = ComparatorProtos.ByteArrayComparable.newBuilder();
+ builder.setValue(ByteString.copyFrom(getValue()));
+ return builder.build().toByteArray();
+ }
+
+ /**
+ * Hides ("overrides") a static method in {@link ByteArrayComparable}.
+ * This method will be called in deserialization.
+ *
+ * @param pbBytes
+ * A pb serialized instance
+ * @return An instance of {@link HBaseIntegerComparator} made from
+ * <code>bytes</code>
+ * @throws DeserializationException if deserialization of bytes to Protocol Buffers failed
+ * @see #toByteArray
+ */
+ public static ByteArrayComparable parseFrom(final byte[] pbBytes)
+ throws DeserializationException {
+ ComparatorProtos.ByteArrayComparable proto;
+ try {
+ proto = ComparatorProtos.ByteArrayComparable.parseFrom(pbBytes);
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ return new HBaseIntegerComparator(Bytes.toLong(proto.getValue()
+ .toByteArray()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseLookupTable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseLookupTable.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseLookupTable.java
new file mode 100644
index 0000000..dc2131c
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseLookupTable.java
@@ -0,0 +1,183 @@
+package org.apache.hawq.pxf.plugins.hbase.utilities;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * HBaseLookupTable will load a table's lookup information
+ * from HBase pxflookup table if exists.<br>
+ * This table holds mappings between HAWQ column names (key) and HBase column names (value).<br>
+ * E.g. for an HBase table "hbase_table", mappings between HAWQ column names and HBase column names,
+ * when <code>"hawq1"</code> is mapped to <code>"cf1:hbase1"</code> and
+ * <code>"hawq2"</code> is mapped to <code>"cf1:hbase2"</code>, will be:<br>
+ * <pre>
+ * ROW COLUMN+CELL
+ * hbase_table column=mapping:hawq1, value=cf1:hbase1
+ * hbase_table column=mapping:hawq2, value=cf1:hbase2
+ * </pre>
+ *
+ * Data is returned as a map of string and byte array from {@link #getMappings(String)}.
+ * <p>
+ * Once created, {@link #close()} MUST be called to cleanup resources.
+ */
+public class HBaseLookupTable implements Closeable {
+ private static final String LOOKUPTABLENAME = "pxflookup";
+ private static final byte[] LOOKUPCOLUMNFAMILY = Bytes.toBytes("mapping");
+
+ private static final Log LOG = LogFactory.getLog(HBaseLookupTable.class);
+
+ private Connection connection;
+ private Configuration hbaseConfiguration;
+ private Admin admin;
+ private Map<byte[], byte[]> rawTableMapping;
+ private Table lookupTable;
+
+ /**
+ * Constructs a connector to HBase lookup table.
+ * Requires calling {@link #close()} to close {@link HBaseAdmin} instance.
+ *
+ * @param conf HBase configuration
+ * @throws IOException when initializing HBaseAdmin fails
+ */
+ public HBaseLookupTable(Configuration conf) throws Exception {
+ hbaseConfiguration = conf;
+ connection = ConnectionFactory.createConnection(hbaseConfiguration);
+ admin = connection.getAdmin();
+ ClusterStatus cs = admin.getClusterStatus();
+ LOG.debug("HBase cluster has " + cs.getServersSize() + " region servers " +
+ "(" + cs.getDeadServers() + " dead)");
+ }
+
+ /**
+ * Returns mappings for given table name between its HAWQ column names and
+ * HBase column names.
+ * If lookup table doesn't exist or no mappings for the table exist, returns null.
+ * <p>
+ * All HAWQ column names are returns in low case.
+ *
+ * @param tableName HBase table name
+ * @return mappings between HAWQ column names and HBase column names
+ * @throws IOException when HBase operations fail
+ */
+ public Map<String, byte[]> getMappings(String tableName) throws IOException {
+ if (!lookupTableValid()) {
+ return null;
+ }
+
+ loadTableMappings(tableName);
+
+ if (tableHasNoMappings()) {
+ return null;
+ }
+
+ return lowerCaseMappings();
+ }
+
+ /**
+ * Closes HBase resources. Must be called after initializing this class.
+ */
+ @Override
+ public void close() throws IOException {
+ admin.close();
+ }
+
+ /**
+ * Returns true if {@link #LOOKUPTABLENAME} is available and enabled.
+ *
+ * @return whether lookup table is valid
+ */
+ private boolean lookupTableValid() throws IOException {
+ return (HBaseUtilities.isTableAvailable(admin, LOOKUPTABLENAME) &&
+ lookupHasCorrectStructure());
+ }
+
+ /**
+ * Returns true if {@link #LOOKUPTABLENAME} has {@value #LOOKUPCOLUMNFAMILY} family.
+ *
+ * @return whether lookup has expected column family name
+ */
+ private boolean lookupHasCorrectStructure() throws IOException {
+ HTableDescriptor htd = admin.getTableDescriptor(TableName.valueOf(LOOKUPTABLENAME));
+ return htd.hasFamily(LOOKUPCOLUMNFAMILY);
+ }
+
+ /**
+ * Loads table name mappings from {@link #LOOKUPTABLENAME} lookup table.
+ *
+ * @param tableName table name
+ */
+ private void loadTableMappings(String tableName) throws IOException {
+ openLookupTable();
+ loadMappingMap(tableName);
+ closeLookupTable();
+ }
+
+ /**
+ * Returns true if lookup table has no relevant mappings.
+ * Should be called after {@link #loadMappingMap(String)}.
+ */
+ private boolean tableHasNoMappings() {
+ return MapUtils.isEmpty(rawTableMapping);
+ }
+
+ /**
+ * Returns a map of mappings between HAWQ and HBase column names,
+ * with the HAWQ column values in lower case.
+ */
+ private Map<String, byte[]> lowerCaseMappings() {
+ Map<String, byte[]> lowCaseKeys = new HashMap<String, byte[]>();
+ for (Map.Entry<byte[], byte[]> entry : rawTableMapping.entrySet()) {
+ lowCaseKeys.put(lowerCase(entry.getKey()),
+ entry.getValue());
+ }
+
+ return lowCaseKeys;
+ }
+
+ /**
+ * Load hbase table object using ConnectionFactory
+ */
+ private void openLookupTable() throws IOException {
+ lookupTable = connection.getTable(TableName.valueOf(LOOKUPTABLENAME));
+ }
+
+ /**
+ * Loads mappings for given table name from the lookup table {@link #LOOKUPTABLENAME}.
+ * The table name should be in the row key, and the family name should be {@link #LOOKUPCOLUMNFAMILY}.
+ *
+ * @param tableName HBase table name
+ * @throws IOException when HBase operations fail
+ */
+ private void loadMappingMap(String tableName) throws IOException {
+ Get lookupRow = new Get(Bytes.toBytes(tableName));
+ lookupRow.setMaxVersions(1);
+ lookupRow.addFamily(LOOKUPCOLUMNFAMILY);
+ Result row;
+
+ row = lookupTable.get(lookupRow);
+ rawTableMapping = row.getFamilyMap(LOOKUPCOLUMNFAMILY);
+ LOG.debug("lookup table mapping for " + tableName +
+ " has " + (rawTableMapping == null ? 0 : rawTableMapping.size()) + " entries");
+ }
+
+ private void closeLookupTable() throws IOException {
+ lookupTable.close();
+ HBaseUtilities.closeConnection(admin, connection);
+ }
+
+ private String lowerCase(byte[] key) {
+ return Bytes.toString(key).toLowerCase();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseTupleDescription.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
new file mode 100644
index 0000000..9150b8d
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseTupleDescription.java
@@ -0,0 +1,120 @@
+package org.apache.hawq.pxf.plugins.hbase.utilities;
+
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The class extends the tuple description provided by {@link InputData}
+ * for usage of {@link HBaseColumnDescriptor}.
+ * <p>
+ * This class also loads lookup table sent (optionally) by the
+ * fragmenter.
+ */
+public class HBaseTupleDescription {
+ private Map<String, byte[]> tableMapping;
+ private List<HBaseColumnDescriptor> tupleDescription;
+ private InputData conf;
+
+ /**
+ * Constructs tuple description of the HBase table.
+ *
+ * @param conf data containing table tuple description
+ */
+ public HBaseTupleDescription(InputData conf) {
+ this.conf = conf;
+ parseHBaseTupleDescription();
+ }
+
+ /**
+ * Returns the number of fields.
+ *
+ * @return number of fields
+ */
+ public int columns() {
+ return tupleDescription.size();
+ }
+
+ /**
+ * Returns the column description of index column.
+ *
+ * @param index column index to be returned
+ * @return column description
+ */
+ public HBaseColumnDescriptor getColumn(int index) {
+ return tupleDescription.get(index);
+ }
+
+ private void parseHBaseTupleDescription() {
+ tupleDescription = new ArrayList<HBaseColumnDescriptor>();
+ loadUserData();
+ createTupleDescription();
+ }
+
+ /**
+ * Loads user information from fragmenter.
+ * The data contains optional table mappings from the lookup table,
+ * between field names in HAWQ table and in the HBase table.
+ */
+ @SuppressWarnings("unchecked")
+ private void loadUserData() {
+ try {
+ byte[] serializedTableMappings = conf.getFragmentUserData();
+
+ // No userdata means no mappings for our table in lookup table
+ if (serializedTableMappings == null) {
+ return;
+ }
+
+ ByteArrayInputStream bytesStream = new ByteArrayInputStream(serializedTableMappings);
+ ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
+ tableMapping = (Map<String, byte[]>) objectStream.readObject();
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while reading expected user data from HBase's fragmenter", e);
+ }
+ }
+
+ private void createTupleDescription() {
+ for (int i = 0; i < conf.getColumns(); ++i) {
+ ColumnDescriptor column = conf.getColumn(i);
+ tupleDescription.add(getHBaseColumn(column));
+ }
+ }
+
+ /**
+ * Returns the {@link #HBaseColumnDescriptor} for given column.
+ * If the column has a lookup table mapping, the HBase column name is used.
+ *
+ * @param column HAWQ column description
+ * @return matching HBase column description
+ */
+ private HBaseColumnDescriptor getHBaseColumn(ColumnDescriptor column) {
+ if (!column.isKeyColumn() && hasMapping(column)) {
+ return new HBaseColumnDescriptor(column, getMapping(column));
+ }
+ return new HBaseColumnDescriptor(column);
+ }
+
+ /**
+ * Returns true if there is a mapping for given column name.
+ */
+ private boolean hasMapping(ColumnDescriptor column) {
+ return tableMapping != null &&
+ tableMapping.containsKey(column.columnName().toLowerCase());
+ }
+
+ /**
+ * Returns the HBase name mapping for the given column name.
+ *
+ * @param column HAWQ column description
+ * @return HBase name for the column
+ */
+ private byte[] getMapping(ColumnDescriptor column) {
+ return tableMapping.get(column.columnName().toLowerCase());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseUtilities.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseUtilities.java
new file mode 100644
index 0000000..01f40bc
--- /dev/null
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/utilities/HBaseUtilities.java
@@ -0,0 +1,57 @@
+package org.apache.hawq.pxf.plugins.hbase.utilities;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+
+public class HBaseUtilities {
+
+ /**
+ * Initializes HBase configuration.
+ * The following parameters are edited:
+ *
+ * hbase.client.retries.number = 1
+ * - tries to connect to HBase only 2 times before failing.
+ *
+ * @return HBase configuration
+ */
+ public static Configuration initHBaseConfiguration() {
+ Configuration conf = HBaseConfiguration.create();
+ conf.set("hbase.client.retries.number", "3");
+ return conf;
+ }
+
+ /**
+ * Returns if given table exists and is enabled.
+ *
+ * @param hbaseAdmin HBase admin, must be initialized
+ * @param tableName table name
+ * @return true if table exists
+ * @throws IOException if a remote or network exception occurs when connecting to HBase
+ */
+ public static boolean isTableAvailable(Admin hbaseAdmin, String tableName) throws IOException {
+ TableName name = TableName.valueOf(tableName);
+ return hbaseAdmin.isTableAvailable(name) &&
+ hbaseAdmin.isTableEnabled(name);
+ }
+
+ /**
+ * Closes HBase admin and connection if they are open.
+ *
+ * @param hbaseAdmin HBase admin
+ * @param hbaseConnection HBase connection
+ * @throws IOException if an I/O error occurs when connecting to HBase
+ */
+ public static void closeConnection(Admin hbaseAdmin, Connection hbaseConnection) throws IOException {
+ if (hbaseAdmin != null) {
+ hbaseAdmin.close();
+ }
+ if (hbaseConnection != null) {
+ hbaseConnection.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseAccessorTest.java b/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseAccessorTest.java
deleted file mode 100644
index 51a6c02..0000000
--- a/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseAccessorTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-package com.pivotal.pxf.plugins.hbase;
-
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseTupleDescription;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.junit.After;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({HBaseAccessor.class, HBaseConfiguration.class, ConnectionFactory.class})
-public class HBaseAccessorTest {
- static final String tableName = "fishy_HBase_table";
-
- InputData inputData;
- HBaseTupleDescription tupleDescription;
- Table table;
- Scan scanDetails;
- Configuration hbaseConfiguration;
- Connection hbaseConnection;
- HBaseAccessor accessor;
-
- /*
- * After each test is done, close the accessor
- * if it was created
- */
- @After
- public void tearDown() throws Exception {
- if (accessor == null) {
- return;
- }
-
- closeAccessor();
- accessor = null;
- }
-
- /*
- * Test construction of HBaseAccessor.
- * Actually no need for this as it is tested in all other tests
- * constructing HBaseAccessor but it serves as a simple example
- * of mocking
- *
- * HBaseAccessor is created and then HBaseTupleDescriptioncreation
- * is verified
- */
- @Test
- public void construction() throws Exception {
- prepareConstruction();
- HBaseAccessor accessor = new HBaseAccessor(inputData);
- PowerMockito.verifyNew(HBaseTupleDescription.class).withArguments(inputData);
- }
-
- /*
- * Test Open returns false when table has no regions
- *
- * Done by returning an empty Map from getRegionLocations
- * Verify Scan object doesn't contain any columns / filters
- * Verify scan did not start
- */
- @Test
- @Ignore
- @SuppressWarnings("unchecked")
- public void tableHasNoMetadata() throws Exception {
- prepareConstruction();
- prepareTableOpen();
- prepareEmptyScanner();
-
- when(inputData.getFragmentMetadata()).thenReturn(null);
-
- accessor = new HBaseAccessor(inputData);
- try {
- accessor.openForRead();
- fail("should throw no metadata exception");
- } catch (Exception e) {
- assertEquals("Missing fragment metadata information", e.getMessage());
- }
-
- verifyScannerDidNothing();
- }
-
- /*
- * Helper for test setup.
- * Creates a mock for HBaseTupleDescription and InputData
- */
- private void prepareConstruction() throws Exception {
- inputData = mock(InputData.class);
- tupleDescription = mock(HBaseTupleDescription.class);
- PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDescription);
- }
-
- /*
- * Helper for test setup.
- * Adds a table name and prepares for table creation
- */
- private void prepareTableOpen() throws Exception {
- // Set table name
- when(inputData.getDataSource()).thenReturn(tableName);
-
- // Make sure we mock static functions in HBaseConfiguration
- PowerMockito.mockStatic(HBaseConfiguration.class);
-
- hbaseConfiguration = mock(Configuration.class);
- when(HBaseConfiguration.create()).thenReturn(hbaseConfiguration);
-
- // Make sure we mock static functions in ConnectionFactory
- PowerMockito.mockStatic(ConnectionFactory.class);
- hbaseConnection = mock(Connection.class);
- when(ConnectionFactory.createConnection(hbaseConfiguration)).thenReturn(hbaseConnection);
- table = mock(Table.class);
- when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
- }
-
- /*
- * Helper for test setup.
- * Sets zero columns (not realistic) and no filter
- */
- private void prepareEmptyScanner() throws Exception {
- scanDetails = mock(Scan.class);
- PowerMockito.whenNew(Scan.class).withNoArguments().thenReturn(scanDetails);
-
- when(tupleDescription.columns()).thenReturn(0);
- when(inputData.hasFilter()).thenReturn(false);
- }
-
- /*
- * Verify Scan object was used but didn't do much
- */
- private void verifyScannerDidNothing() throws Exception {
- // setMaxVersions was called with 1
- verify(scanDetails).setMaxVersions(1);
- // addColumn was not called
- verify(scanDetails, never()).addColumn(any(byte[].class), any(byte[].class));
- // addFilter was not called
- verify(scanDetails, never()).setFilter(any(org.apache.hadoop.hbase.filter.Filter.class));
- // Nothing else was missed
- verifyNoMoreInteractions(scanDetails);
- // Scanner was not used
- verify(table, never()).getScanner(scanDetails);
- }
-
- /*
- * Close the accessor and make sure table was closed
- */
- private void closeAccessor() throws Exception {
- accessor.closeForRead();
- verify(table).close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseResolverTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseResolverTest.java b/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseResolverTest.java
deleted file mode 100644
index eefb1d0..0000000
--- a/pxf/pxf-hbase/src/test/java/com/pivotal/pxf/plugins/hbase/HBaseResolverTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package com.pivotal.pxf.plugins.hbase;
-
-import com.pivotal.pxf.api.BadRecordException;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hbase.utilities.HBaseTupleDescription;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({HBaseResolver.class})
-public class HBaseResolverTest {
- InputData inputData;
- HBaseTupleDescription tupleDesc;
-
- @Test
- /*
- * Test construction of HBaseResolver.
- *
- * HBaseResolver is created and then HBaseTupleDescription
- * creation is verified
- */
- public void construction() throws Exception {
- inputData = mock(InputData.class);
- tupleDesc = mock(HBaseTupleDescription.class);
- PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDesc);
-
- HBaseResolver resolver = new HBaseResolver(inputData);
- PowerMockito.verifyNew(HBaseTupleDescription.class).withArguments(inputData);
- }
-
- @Test
- /*
- * Test the convertToJavaObject method
- */
- public void testConvertToJavaObject() throws Exception {
- Object result;
-
- inputData = mock(InputData.class);
- tupleDesc = mock(HBaseTupleDescription.class);
- PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDesc);
-
- HBaseResolver resolver = new HBaseResolver(inputData);
-
- /*
- * Supported type, No value.
- * Should successfully return Null.
- */
- result = resolver.convertToJavaObject(20, "bigint", null);
- assertNull(result);
-
- /*
- * Supported type, With value
- * Should successfully return a Java Object that holds original value
- */
- result = resolver.convertToJavaObject(20, "bigint", "1234".getBytes());
- assertEquals(((Long) result).longValue(), 1234L);
-
- /*
- * Supported type, Invalid value
- * Should throw a BadRecordException, with detailed explanation.
- */
- try {
- result = resolver.convertToJavaObject(20, "bigint", "not_a_numeral".getBytes());
- fail("Supported type, Invalid value should throw an exception");
- } catch (BadRecordException e) {
- assertEquals("Error converting value 'not_a_numeral' to type bigint. (original error: For input string: \"not_a_numeral\")", e.getMessage());
- } catch (Exception e) {
- fail("Supported type, Invalid value expected to catch a BadRecordException, caught Exception");
- }
-
- /*
- * Unsupported type
- * Should throw an Exception, indicating the name of the unsupported type
- */
- try {
- result = resolver.convertToJavaObject(600, "point", "[1,1]".getBytes());
- fail("Unsupported data type should throw exception");
- } catch (Exception e) {
- assertEquals("Unsupported data type point", e.getMessage());
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessorTest.java b/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessorTest.java
new file mode 100644
index 0000000..c9119c7
--- /dev/null
+++ b/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseAccessorTest.java
@@ -0,0 +1,160 @@
+package org.apache.hawq.pxf.plugins.hbase;
+
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseTupleDescription;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({HBaseAccessor.class, HBaseConfiguration.class, ConnectionFactory.class})
+public class HBaseAccessorTest {
+ static final String tableName = "fishy_HBase_table";
+
+ InputData inputData;
+ HBaseTupleDescription tupleDescription;
+ Table table;
+ Scan scanDetails;
+ Configuration hbaseConfiguration;
+ Connection hbaseConnection;
+ HBaseAccessor accessor;
+
+ /*
+ * After each test is done, close the accessor
+ * if it was created
+ */
+ @After
+ public void tearDown() throws Exception {
+ if (accessor == null) {
+ return;
+ }
+
+ closeAccessor();
+ accessor = null;
+ }
+
+ /*
+ * Test construction of HBaseAccessor.
+ * Actually no need for this as it is tested in all other tests
+ * constructing HBaseAccessor but it serves as a simple example
+ * of mocking
+ *
+ * HBaseAccessor is created and then HBaseTupleDescriptioncreation
+ * is verified
+ */
+ @Test
+ public void construction() throws Exception {
+ prepareConstruction();
+ HBaseAccessor accessor = new HBaseAccessor(inputData);
+ PowerMockito.verifyNew(HBaseTupleDescription.class).withArguments(inputData);
+ }
+
+ /*
+ * Test Open returns false when table has no regions
+ *
+ * Done by returning an empty Map from getRegionLocations
+ * Verify Scan object doesn't contain any columns / filters
+ * Verify scan did not start
+ */
+ @Test
+ @Ignore
+ @SuppressWarnings("unchecked")
+ public void tableHasNoMetadata() throws Exception {
+ prepareConstruction();
+ prepareTableOpen();
+ prepareEmptyScanner();
+
+ when(inputData.getFragmentMetadata()).thenReturn(null);
+
+ accessor = new HBaseAccessor(inputData);
+ try {
+ accessor.openForRead();
+ fail("should throw no metadata exception");
+ } catch (Exception e) {
+ assertEquals("Missing fragment metadata information", e.getMessage());
+ }
+
+ verifyScannerDidNothing();
+ }
+
+ /*
+ * Helper for test setup.
+ * Creates a mock for HBaseTupleDescription and InputData
+ */
+ private void prepareConstruction() throws Exception {
+ inputData = mock(InputData.class);
+ tupleDescription = mock(HBaseTupleDescription.class);
+ PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDescription);
+ }
+
+ /*
+ * Helper for test setup.
+ * Adds a table name and prepares for table creation
+ */
+ private void prepareTableOpen() throws Exception {
+ // Set table name
+ when(inputData.getDataSource()).thenReturn(tableName);
+
+ // Make sure we mock static functions in HBaseConfiguration
+ PowerMockito.mockStatic(HBaseConfiguration.class);
+
+ hbaseConfiguration = mock(Configuration.class);
+ when(HBaseConfiguration.create()).thenReturn(hbaseConfiguration);
+
+ // Make sure we mock static functions in ConnectionFactory
+ PowerMockito.mockStatic(ConnectionFactory.class);
+ hbaseConnection = mock(Connection.class);
+ when(ConnectionFactory.createConnection(hbaseConfiguration)).thenReturn(hbaseConnection);
+ table = mock(Table.class);
+ when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
+ }
+
+ /*
+ * Helper for test setup.
+ * Sets zero columns (not realistic) and no filter
+ */
+ private void prepareEmptyScanner() throws Exception {
+ scanDetails = mock(Scan.class);
+ PowerMockito.whenNew(Scan.class).withNoArguments().thenReturn(scanDetails);
+
+ when(tupleDescription.columns()).thenReturn(0);
+ when(inputData.hasFilter()).thenReturn(false);
+ }
+
+ /*
+ * Verify Scan object was used but didn't do much
+ */
+ private void verifyScannerDidNothing() throws Exception {
+ // setMaxVersions was called with 1
+ verify(scanDetails).setMaxVersions(1);
+ // addColumn was not called
+ verify(scanDetails, never()).addColumn(any(byte[].class), any(byte[].class));
+ // addFilter was not called
+ verify(scanDetails, never()).setFilter(any(org.apache.hadoop.hbase.filter.Filter.class));
+ // Nothing else was missed
+ verifyNoMoreInteractions(scanDetails);
+ // Scanner was not used
+ verify(table, never()).getScanner(scanDetails);
+ }
+
+ /*
+ * Close the accessor and make sure table was closed
+ */
+ private void closeAccessor() throws Exception {
+ accessor.closeForRead();
+ verify(table).close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolverTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolverTest.java b/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolverTest.java
new file mode 100644
index 0000000..82890a8
--- /dev/null
+++ b/pxf/pxf-hbase/src/test/java/org/apache/hawq/pxf/plugins/hbase/HBaseResolverTest.java
@@ -0,0 +1,89 @@
+package org.apache.hawq.pxf.plugins.hbase;
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseTupleDescription;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({HBaseResolver.class})
+public class HBaseResolverTest {
+ InputData inputData;
+ HBaseTupleDescription tupleDesc;
+
+ @Test
+ /*
+ * Test construction of HBaseResolver.
+ *
+ * HBaseResolver is created and then HBaseTupleDescription
+ * creation is verified
+ */
+ public void construction() throws Exception {
+ inputData = mock(InputData.class);
+ tupleDesc = mock(HBaseTupleDescription.class);
+ PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDesc);
+
+ HBaseResolver resolver = new HBaseResolver(inputData);
+ PowerMockito.verifyNew(HBaseTupleDescription.class).withArguments(inputData);
+ }
+
+ @Test
+ /*
+ * Test the convertToJavaObject method
+ */
+ public void testConvertToJavaObject() throws Exception {
+ Object result;
+
+ inputData = mock(InputData.class);
+ tupleDesc = mock(HBaseTupleDescription.class);
+ PowerMockito.whenNew(HBaseTupleDescription.class).withArguments(inputData).thenReturn(tupleDesc);
+
+ HBaseResolver resolver = new HBaseResolver(inputData);
+
+ /*
+ * Supported type, No value.
+ * Should successfully return Null.
+ */
+ result = resolver.convertToJavaObject(20, "bigint", null);
+ assertNull(result);
+
+ /*
+ * Supported type, With value
+ * Should successfully return a Java Object that holds original value
+ */
+ result = resolver.convertToJavaObject(20, "bigint", "1234".getBytes());
+ assertEquals(((Long) result).longValue(), 1234L);
+
+ /*
+ * Supported type, Invalid value
+ * Should throw a BadRecordException, with detailed explanation.
+ */
+ try {
+ result = resolver.convertToJavaObject(20, "bigint", "not_a_numeral".getBytes());
+ fail("Supported type, Invalid value should throw an exception");
+ } catch (BadRecordException e) {
+ assertEquals("Error converting value 'not_a_numeral' to type bigint. (original error: For input string: \"not_a_numeral\")", e.getMessage());
+ } catch (Exception e) {
+ fail("Supported type, Invalid value expected to catch a BadRecordException, caught Exception");
+ }
+
+ /*
+ * Unsupported type
+ * Should throw an Exception, indicating the name of the unsupported type
+ */
+ try {
+ result = resolver.convertToJavaObject(600, "point", "[1,1]".getBytes());
+ fail("Unsupported data type should throw exception");
+ } catch (Exception e) {
+ assertEquals("Unsupported data type point", e.getMessage());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java
deleted file mode 100644
index 1a667f4..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroFileAccessor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.utilities.InputData;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.*;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.io.IOException;
-
-import static com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities.getAvroSchema;
-
-/**
- * A PXF Accessor for reading Avro File records
- */
-public class AvroFileAccessor extends HdfsSplittableDataAccessor {
- private AvroWrapper<GenericRecord> avroWrapper = null;
-
- /**
- * Constructs a AvroFileAccessor that creates the job configuration and
- * accesses the avro file to fetch the avro schema
- *
- * @param input all input parameters coming from the client
- * @throws Exception if getting the avro schema fails
- */
- public AvroFileAccessor(InputData input) throws Exception {
- // 1. Call the base class
- super(input, new AvroInputFormat<GenericRecord>());
-
- // 2. Accessing the avro file through the "unsplittable" API just to get the schema.
- // The splittable API (AvroInputFormat) which is the one we will be using to fetch
- // the records, does not support getting the avro schema yet.
- Schema schema = getAvroSchema(conf, inputData.getDataSource());
-
- // 3. Pass the schema to the AvroInputFormat
- AvroJob.setInputSchema(jobConf, schema);
-
- // 4. The avroWrapper required for the iteration
- avroWrapper = new AvroWrapper<GenericRecord>();
- }
-
- @Override
- protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
- return new AvroRecordReader<Object>(jobConf, (FileSplit) split);
- }
-
- /**
- * readNextObject
- * The AVRO accessor is currently the only specialized accessor that
- * overrides this method. This happens, because the special
- * AvroRecordReader.next() semantics (use of the AvroWrapper), so it
- * cannot use the RecordReader's default implementation in
- * SplittableFileAccessor
- */
- @Override
- public OneRow readNextObject() throws IOException {
- /** Resetting datum to null, to avoid stale bytes to be padded from the previous row's datum */
- avroWrapper.datum(null);
- if (reader.next(avroWrapper, NullWritable.get())) { // There is one more record in the current split.
- return new OneRow(null, avroWrapper.datum());
- } else if (getNextSplit()) { // The current split is exhausted. try to move to the next split.
- return reader.next(avroWrapper, NullWritable.get())
- ? new OneRow(null, avroWrapper.datum())
- : null;
- }
-
- // if neither condition was met, it means we already read all the records in all the splits, and
- // in this call record variable was not set, so we return null and thus we are signaling end of
- // records sequence - in this case avroWrapper.datum() will be null
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java
deleted file mode 100644
index 5249877..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/AvroResolver.java
+++ /dev/null
@@ -1,389 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.ReadResolver;
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import com.pivotal.pxf.plugins.hdfs.utilities.RecordkeyAdapter;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static com.pivotal.pxf.api.io.DataType.*;
-import static com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.SCHEMA_NOT_INDICATED;
-import static com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.SCHEMA_NOT_ON_CLASSPATH;
-import static com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities.getAvroSchema;
-
-/**
- * Class AvroResolver handles deserialization of records that were serialized
- * using the AVRO serialization framework.
- */
-public class AvroResolver extends Plugin implements ReadResolver {
- private GenericRecord avroRecord = null;
- private DatumReader<GenericRecord> reader = null;
- // member kept to enable reuse, and thus avoid repeated allocation
- private BinaryDecoder decoder = null;
- private List<Schema.Field> fields = null;
- private RecordkeyAdapter recordkeyAdapter = new RecordkeyAdapter();
- private static final String MAPKEY_DELIM = ":";
- private static final String RECORDKEY_DELIM = ":";
- private static final String COLLECTION_DELIM = ",";
- private String collectionDelim;
- private String mapkeyDelim;
- private String recordkeyDelim;
-
- /**
- * Constructs an AvroResolver. Initializes Avro data structure: the Avro
- * record - fields information and the Avro record reader. All Avro data is
- * build from the Avro schema, which is based on the *.avsc file that was
- * passed by the user
- *
- * @param input all input parameters coming from the client
- * @throws IOException if Avro schema could not be retrieved or parsed
- */
- public AvroResolver(InputData input) throws IOException {
- super(input);
-
- Schema schema = isAvroFile() ? getAvroSchema(new Configuration(),
- input.getDataSource())
- : (new Schema.Parser()).parse(openExternalSchema());
-
- reader = new GenericDatumReader<>(schema);
- fields = schema.getFields();
-
- collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
- : input.getUserProperty("COLLECTION_DELIM");
- mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
- : input.getUserProperty("MAPKEY_DELIM");
- recordkeyDelim = input.getUserProperty("RECORDKEY_DELIM") == null ? RECORDKEY_DELIM
- : input.getUserProperty("RECORDKEY_DELIM");
- }
-
- /**
- * Returns a list of the fields of one record. Each record field is
- * represented by a OneField item. OneField item contains two fields: an
- * integer representing the field type and a Java Object representing the
- * field value.
- */
- @Override
- public List<OneField> getFields(OneRow row) throws Exception {
- avroRecord = makeAvroRecord(row.getData(), avroRecord);
- List<OneField> record = new LinkedList<OneField>();
-
- int recordkeyIndex = (inputData.getRecordkeyColumn() == null) ? -1
- : inputData.getRecordkeyColumn().columnIndex();
- int currentIndex = 0;
-
- for (Schema.Field field : fields) {
- /*
- * Add the record key if exists
- */
- if (currentIndex == recordkeyIndex) {
- currentIndex += recordkeyAdapter.appendRecordkeyField(record,
- inputData, row);
- }
-
- currentIndex += populateRecord(record,
- avroRecord.get(field.name()), field.schema());
- }
-
- return record;
- }
-
- /**
- * Tests if the Avro records are residing inside an AVRO file. If the Avro
- * records are not residing inside an AVRO file, then they may reside inside
- * a sequence file, regular file, ...
- *
- * @return whether the resource is an Avro file
- */
- boolean isAvroFile() {
- return inputData.getAccessor().toLowerCase().contains("avro");
- }
-
- /**
- * The record can arrive from one out of two different sources: a sequence
- * file or an AVRO file. If it comes from an AVRO file, then it was already
- * obtained as a {@link GenericRecord} when when it was fetched from the
- * file with the {@link AvroRecorReader} so in this case a cast is enough.
- * On the other hand, if the source is a sequence file, then the input
- * parameter obj hides a bytes [] buffer which is in fact one Avro record
- * serialized. Here, we build the Avro record from the flat buffer, using
- * the AVRO API. Then (for both cases) in the remaining functions we build a
- * {@code List<OneField>} record from the Avro record.
- *
- * @param obj object holding an Avro record
- * @param reuseRecord Avro record to be reused to create new record from obj
- * @return Avro record
- * @throws IOException if creating the Avro record from byte array failed
- */
- GenericRecord makeAvroRecord(Object obj, GenericRecord reuseRecord)
- throws IOException {
- if (isAvroFile()) {
- return (GenericRecord) obj;
- } else {
- byte[] bytes = ((BytesWritable) obj).getBytes();
- decoder = DecoderFactory.get().binaryDecoder(bytes, decoder);
- return reader.read(reuseRecord, decoder);
- }
- }
-
- /**
- * For a given field in the Avro record we extract its value and insert it
- * into the output {@code List<OneField>} record. An Avro field can be a
- * primitive type or an array type.
- *
- * @param record list of fields to be populated
- * @param fieldValue field value
- * @param fieldSchema field schema
- * @return the number of populated fields
- */
- int populateRecord(List<OneField> record, Object fieldValue,
- Schema fieldSchema) {
-
- Schema.Type fieldType = fieldSchema.getType();
- int ret = 0;
- Object value = fieldValue;
-
- switch (fieldType) {
- case ARRAY:
- if(fieldValue == null) {
- return addOneFieldToRecord(record, TEXT, fieldValue);
- }
- List<OneField> listRecord = new LinkedList<>();
- ret = setArrayField(listRecord, fieldValue, fieldSchema);
- addOneFieldToRecord(record, TEXT, String.format("[%s]",
- HdfsUtilities.toString(listRecord, collectionDelim)));
- break;
- case MAP:
- if(fieldValue == null) {
- return addOneFieldToRecord(record, TEXT, fieldValue);
- }
- List<OneField> mapRecord = new LinkedList<>();
- ret = setMapField(mapRecord, fieldValue, fieldSchema);
- addOneFieldToRecord(record, TEXT, String.format("{%s}",
- HdfsUtilities.toString(mapRecord, collectionDelim)));
- break;
- case RECORD:
- if(fieldValue == null) {
- return addOneFieldToRecord(record, TEXT, fieldValue);
- }
- List<OneField> recRecord = new LinkedList<>();
- ret = setRecordField(recRecord, fieldValue, fieldSchema);
- addOneFieldToRecord(record, TEXT, String.format("{%s}",
- HdfsUtilities.toString(recRecord, collectionDelim)));
- break;
- case UNION:
- /*
- * When an Avro field is actually a union, we resolve the type
- * of the union element, and delegate the record update via
- * recursion
- */
- int unionIndex = GenericData.get().resolveUnion(fieldSchema,
- fieldValue);
- /**
- * Retrieve index of the non null data type from the type array
- * if value is null
- */
- if (fieldValue == null) {
- unionIndex ^= 1;
- }
- ret = populateRecord(record, fieldValue,
- fieldSchema.getTypes().get(unionIndex));
- break;
- case ENUM:
- ret = addOneFieldToRecord(record, TEXT, value);
- break;
- case INT:
- ret = addOneFieldToRecord(record, INTEGER, value);
- break;
- case DOUBLE:
- ret = addOneFieldToRecord(record, FLOAT8, value);
- break;
- case STRING:
- value = (fieldValue != null) ? String.format("%s", fieldValue)
- : null;
- ret = addOneFieldToRecord(record, TEXT, value);
- break;
- case FLOAT:
- ret = addOneFieldToRecord(record, REAL, value);
- break;
- case LONG:
- ret = addOneFieldToRecord(record, BIGINT, value);
- break;
- case BYTES:
- ret = addOneFieldToRecord(record, BYTEA, value);
- break;
- case BOOLEAN:
- ret = addOneFieldToRecord(record, BOOLEAN, value);
- break;
- case FIXED:
- ret = addOneFieldToRecord(record, BYTEA, value);
- break;
- default:
- break;
- }
- return ret;
- }
-
- /**
- * When an Avro field is actually a record, we iterate through each field
- * for each entry, the field name and value are added to a local record
- * {@code List<OneField>} complexRecord with the necessary delimiter we
- * create an object of type OneField and insert it into the output
- * {@code List<OneField>} record.
- *
- * @param record list of fields to be populated
- * @param value field value
- * @param recSchema record schema
- * @return number of populated fields
- */
- int setRecordField(List<OneField> record, Object value, Schema recSchema) {
-
- GenericRecord rec = ((GenericData.Record) value);
- Schema fieldKeySchema = Schema.create(Schema.Type.STRING);
- int currentIndex = 0;
- for (Schema.Field field : recSchema.getFields()) {
- Schema fieldSchema = field.schema();
- Object fieldValue = rec.get(field.name());
- List<OneField> complexRecord = new LinkedList<>();
- populateRecord(complexRecord, field.name(), fieldKeySchema);
- populateRecord(complexRecord, fieldValue, fieldSchema);
- addOneFieldToRecord(record, TEXT,
- HdfsUtilities.toString(complexRecord, recordkeyDelim));
- currentIndex++;
- }
- return currentIndex;
- }
-
- /**
- * When an Avro field is actually a map, we resolve the type of the map
- * value For each entry, the field name and value are added to a local
- * record we create an object of type OneField and insert it into the output
- * {@code List<OneField>} record.
- *
- * Unchecked warning is suppressed to enable us to cast fieldValue to a Map.
- * (since the value schema has been identified to me of type map)
- *
- * @param record list of fields to be populated
- * @param fieldValue field value
- * @param mapSchema map schema
- * @return number of populated fields
- */
- @SuppressWarnings("unchecked")
- int setMapField(List<OneField> record, Object fieldValue, Schema mapSchema) {
- Schema keySchema = Schema.create(Schema.Type.STRING);
- Schema valueSchema = mapSchema.getValueType();
- Map<String, ?> avroMap = ((Map<String, ?>) fieldValue);
- for (Map.Entry<String, ?> entry : avroMap.entrySet()) {
- List<OneField> complexRecord = new LinkedList<>();
- populateRecord(complexRecord, entry.getKey(), keySchema);
- populateRecord(complexRecord, entry.getValue(), valueSchema);
- addOneFieldToRecord(record, TEXT,
- HdfsUtilities.toString(complexRecord, mapkeyDelim));
- }
- return avroMap.size();
- }
-
- /**
- * When an Avro field is actually an array, we resolve the type of the array
- * element, and for each element in the Avro array, we recursively invoke
- * the population of {@code List<OneField>} record.
- *
- * @param record list of fields to be populated
- * @param fieldValue field value
- * @param arraySchema array schema
- * @return number of populated fields
- */
- int setArrayField(List<OneField> record, Object fieldValue,
- Schema arraySchema) {
- Schema typeSchema = arraySchema.getElementType();
- GenericData.Array<?> array = (GenericData.Array<?>) fieldValue;
- int length = array.size();
- for (int i = 0; i < length; i++) {
- populateRecord(record, array.get(i), typeSchema);
- }
- return length;
- }
-
- /**
- * Creates the {@link OneField} object and adds it to the output {@code List<OneField>}
- * record. Strings and byte arrays are held inside special types in the Avro
- * record so we transfer them to standard types in order to enable their
- * insertion in the GPDBWritable instance.
- *
- * @param record list of fields to be populated
- * @param gpdbWritableType field type
- * @param val field value
- * @return 1 (number of populated fields)
- */
- int addOneFieldToRecord(List<OneField> record, DataType gpdbWritableType,
- Object val) {
- OneField oneField = new OneField();
- oneField.type = gpdbWritableType.getOID();
- switch (gpdbWritableType) {
- case BYTEA:
- if (val instanceof ByteBuffer) {
- oneField.val = ((ByteBuffer) val).array();
- } else {
- /**
- * Entry point when the underlying bytearray is from a Fixed
- * data
- */
- oneField.val = ((GenericData.Fixed) val).bytes();
- }
- break;
- default:
- oneField.val = val;
- break;
- }
-
- record.add(oneField);
- return 1;
- }
-
- /**
- * Opens Avro schema based on DATA-SCHEMA parameter.
- *
- * @return InputStream of schema file
- * @throws DataSchemaException if schema file could not be opened
- */
- InputStream openExternalSchema() {
-
- String schemaName = inputData.getUserProperty("DATA-SCHEMA");
-
- /**
- * Testing that the schema name was supplied by the user - schema is an
- * optional properly.
- */
- if (schemaName == null) {
- throw new DataSchemaException(SCHEMA_NOT_INDICATED,
- this.getClass().getName());
- }
-
- /** Testing that the schema resource exists. */
- if (this.getClass().getClassLoader().getResource(schemaName) == null) {
- throw new DataSchemaException(SCHEMA_NOT_ON_CLASSPATH, schemaName);
- }
- ClassLoader loader = this.getClass().getClassLoader();
- return loader.getResourceAsStream(schemaName);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java
deleted file mode 100644
index 1873d7d..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkReader.java
+++ /dev/null
@@ -1,176 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import com.pivotal.pxf.plugins.hdfs.ChunkWritable;
-
-/**
- * A class that provides a line reader from an input stream. Lines are
- * terminated by '\n' (LF) EOF also terminates an otherwise unterminated line.
- */
-public class ChunkReader implements Closeable {
- public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
- private int bufferSize = DEFAULT_BUFFER_SIZE;
- private InputStream in;
- private byte[] buffer;
- // the number of bytes of real data in the buffer
- private int bufferLength = 0;
- // the current position in the buffer
- private int bufferPosn = 0;
- private static final byte LF = '\n';
-
- /**
- * Constructs a ChunkReader instance
- *
- * @param in input stream
- */
- public ChunkReader(InputStream in) {
- this.in = in;
- this.buffer = new byte[this.bufferSize];
- }
-
- /**
- * Closes the underlying stream.
- */
- @Override
- public void close() throws IOException {
- in.close();
- }
-
- /*
- * Internal class used for holding part of a chunk brought by one read()
- * operation on the input stream. We collect several such nodes in a list by
- * doing several read operation until we reach the chunk size -
- * maxBytesToConsume
- */
- private class Node {
- /* part of a chunk brought in a single inputstream.read() operation */
- public byte[] slice;
- /* the size of the slice */
- public int len;
- }
-
- /**
- * Reads data in chunks of DEFAULT_CHUNK_SIZE, until we reach
- * maxBytesToConsume.
- *
- * @param str - output parameter, will contain the read chunk byte array
- * @param maxBytesToConsume - requested chunk size
- * @return actual chunk size
- * @throws IOException if the first byte cannot be read for any reason
- * other than the end of the file, if the input stream has been closed,
- * or if some other I/O error occurs.
- */
- public int readChunk(Writable str, int maxBytesToConsume) throws IOException
- {
- ChunkWritable cw = (ChunkWritable) str;
- List<Node> list = new LinkedList<Node>();
-
- long bytesConsumed = 0;
-
- do {
- if (bufferLength > 0) {
- int remaining = bufferLength - bufferPosn;
- Node nd = new Node();
- nd.slice = new byte[remaining];
- nd.len = remaining;
- System.arraycopy(buffer, bufferPosn, nd.slice, 0, nd.len);
- list.add(nd);
- bytesConsumed += nd.len;
- } else {
- Node nd = new Node();
- nd.slice = new byte[buffer.length];
- nd.len = in.read(nd.slice);
- if (nd.len <= 0) {
- break; // EOF
- }
- bytesConsumed += nd.len;
- list.add(nd);
- }
-
- bufferLength = bufferPosn = 0;
-
- } while (bytesConsumed < maxBytesToConsume);
-
- if (list.size() > 0) {
- cw.box = new byte[(int) bytesConsumed];
- int pos = 0;
- for (int i = 0; i < list.size(); i++) {
- Node n = list.get(i);
- System.arraycopy(n.slice, 0, cw.box, pos, n.len);
- pos += n.len;
- }
- }
-
- return (int) bytesConsumed;
- }
-
- /**
- * Reads a line terminated by LF.
- *
- * @param str - output parameter, will contain the read record
- * @param maxBytesToConsume - the line mustn't exceed this value
- * @return length of the line read
- * @throws IOException if the first byte cannot be read for any reason
- * other than the end of the file, if the input stream has been closed,
- * or if some other I/O error occurs.
- */
- public int readLine(Writable str, int maxBytesToConsume) throws IOException {
- ChunkWritable cw = (ChunkWritable) str;
- List<Node> list = new LinkedList<Node>();
-
- boolean newLine = false; // length of terminating newline
- long bytesConsumed = 0;
-
- do {
- int startPosn = bufferPosn; // starting from where we left off the
- // last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
-
- bufferLength = in.read(buffer);
- if (bufferLength <= 0) {
- break; // EOF
- }
- }
-
- for (; bufferPosn < bufferLength; ++bufferPosn) { // search for
- // newline
- if (buffer[bufferPosn] == LF) {
- newLine = true;
- ++bufferPosn; // at next invocation proceed from following
- // byte
- break;
- }
- }
-
- int readLength = bufferPosn - startPosn;
- bytesConsumed += readLength;
-
- if (readLength > 0) {
- Node nd = new Node();
- nd.slice = new byte[readLength];
- nd.len = readLength;
- System.arraycopy(buffer, startPosn, nd.slice, 0, nd.len);
- list.add(nd);
- }
- } while (!newLine && bytesConsumed < maxBytesToConsume);
-
- if (list.size() > 0) {
- cw.box = new byte[(int) bytesConsumed];
- int pos = 0;
- for (int i = 0; i < list.size(); i++) {
- Node n = list.get(i);
- System.arraycopy(n.slice, 0, cw.box, pos, n.len);
- pos += n.len;
- }
- }
-
- return (int) bytesConsumed;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
deleted file mode 100644
index 13a3546..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
+++ /dev/null
@@ -1,281 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
-import static org.apache.hadoop.mapreduce.lib.input.LineRecordReader.MAX_LINE_LENGTH;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.hdfs.DFSInputStream;
-import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-
-/**
- * ChunkRecordReader is designed for fast reading of a file split. The idea is
- * to bring chunks of data instead of single records. The chunks contain many
- * records and the chunk end is not aligned on a record boundary. The size of
- * the chunk is a class hardcoded parameter - CHUNK_SIZE. This behaviour sets
- * this reader apart from the other readers which will fetch one record and stop
- * when reaching a record delimiter.
- */
-public class ChunkRecordReader implements
- RecordReader<LongWritable, ChunkWritable> {
- private static final Log LOG = LogFactory.getLog(ChunkRecordReader.class.getName());
-
- private CompressionCodecFactory compressionCodecs = null;
- private long start;
- private long pos;
- private long end;
- private long fileLength;
- private ChunkReader in;
- private FSDataInputStream fileIn;
- private final Seekable filePosition;
- private int maxLineLength;
- private CompressionCodec codec;
- private Decompressor decompressor;
- private static final int CHUNK_SIZE = 1024 * 1024;
-
- /**
- * Translates the FSDataInputStream into a DFSInputStream.
- */
- private DFSInputStream getInputStream() {
- return (DFSInputStream) (fileIn.getWrappedStream());
- }
-
- /**
- * Returns statistics of the input stream's read operation: total bytes
- * read, bytes read locally, bytes read in short-circuit (directly from file
- * descriptor).
- *
- * @return an instance of ReadStatistics class
- */
- public ReadStatistics getReadStatistics() {
- return getInputStream().getReadStatistics();
- }
-
- /**
- * Constructs a ChunkRecordReader instance.
- *
- * @param job the job configuration
- * @param split contains the file name, begin byte of the split and the
- * bytes length
- * @throws IOException if an I/O error occurs when accessing the file or
- * creating input stream to read from it
- */
- public ChunkRecordReader(Configuration job, FileSplit split)
- throws IOException {
- maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
- validateLength(maxLineLength);
- start = split.getStart();
- end = start + split.getLength();
- final Path file = split.getPath();
- compressionCodecs = new CompressionCodecFactory(job);
- codec = compressionCodecs.getCodec(file);
-
- // open the file and seek to the start of the split
- job.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
- final FileSystem fs = file.getFileSystem(job);
- fs.setVerifyChecksum(false);
- fileIn = fs.open(file, ChunkReader.DEFAULT_BUFFER_SIZE);
- fileLength = getInputStream().getFileLength();
- if (isCompressedInput()) {
- decompressor = CodecPool.getDecompressor(codec);
- if (codec instanceof SplittableCompressionCodec) {
- final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
- fileIn, decompressor, start, end,
- SplittableCompressionCodec.READ_MODE.BYBLOCK);
- in = new ChunkReader(cIn);
- start = cIn.getAdjustedStart();
- end = cIn.getAdjustedEnd();
- filePosition = cIn; // take pos from compressed stream
- } else {
- in = new ChunkReader(codec.createInputStream(fileIn,
- decompressor));
- filePosition = fileIn;
- }
- } else {
- fileIn.seek(start);
- in = new ChunkReader(fileIn);
- filePosition = fileIn;
- }
- /*
- * If this is not the first split, we always throw away first record
- * because we always (except the last split) read one extra line in
- * next() method.
- */
- if (start != 0) {
- start += in.readLine(new ChunkWritable(), maxBytesToConsume(start));
- }
- this.pos = start;
- }
-
- /**
- * Used by the client of this class to create the 'key' output parameter for
- * next() method.
- *
- * @return an instance of LongWritable
- */
- @Override
- public LongWritable createKey() {
- return new LongWritable();
- }
-
- /**
- * Used by the client of this class to create the 'value' output parameter
- * for next() method.
- *
- * @return an instance of ChunkWritable
- */
- @Override
- public ChunkWritable createValue() {
- return new ChunkWritable();
- }
-
- /**
- * Fetches the next data chunk from the file split. The size of the chunk is
- * a class hardcoded parameter - CHUNK_SIZE. This behaviour sets this reader
- * apart from the other readers which will fetch one record and stop when
- * reaching a record delimiter.
- *
- * @param key - output parameter. When method returns will contain the key -
- * the number of the start byte of the chunk
- * @param value - output parameter. When method returns will contain the
- * value - the chunk, a byte array inside the ChunkWritable
- * instance
- * @return false - when end of split was reached
- * @throws IOException if an I/O error occurred while reading the next chunk
- * or line
- */
- @Override
- public synchronized boolean next(LongWritable key, ChunkWritable value)
- throws IOException {
- /*
- * Usually a record is spread between the end of current split and the
- * beginning of next split. So when reading the last record in the split
- * we usually need to cross over to the next split. This tricky logic is
- * implemented in ChunkReader.readLine(). In order not to rewrite this
- * logic we will read the lust chunk in the split with readLine(). For a
- * split of 120M, reading the last 1M line by line doesn't have a huge
- * impact. Applying a factor to the last chunk to make sure we start
- * before the last record.
- */
- float factor = 1.5f;
- int limit = (int) (factor * CHUNK_SIZE);
- long curPos = getFilePosition();
- int newSize = 0;
-
- while (curPos <= end) {
- key.set(pos);
-
- if ((end - curPos) > limit) {
- newSize = in.readChunk(value, CHUNK_SIZE);
- } else {
- newSize = in.readLine(value,
- Math.max(maxBytesToConsume(pos), maxLineLength));
- }
- if (newSize == 0) {
- break;
- }
-
- pos += newSize;
-
- if (pos == fileLength) { /*
- * in case text file last character is not
- * a linefeed
- */
- if (value.box[value.box.length - 1] != '\n') {
- int newLen = value.box.length + 1;
- byte[] tmp = new byte[newLen];
- System.arraycopy(value.box, 0, tmp, 0, newLen - 1);
- tmp[newLen - 1] = '\n';
- value.box = tmp;
- }
- }
-
- return true;
- }
- /*
- * if we got here, either newSize was 0 or curPos is bigger than end
- */
-
- return false;
- }
-
- /**
- * Gets the progress within the split.
- */
- @Override
- public synchronized float getProgress() throws IOException {
- if (start == end) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (getFilePosition() - start)
- / (float) (end - start));
- }
- }
-
- /**
- * Returns the position of the unread tail of the file
- *
- * @return pos - start byte of the unread tail of the file
- */
- @Override
- public synchronized long getPos() throws IOException {
- return pos;
- }
-
- /**
- * Closes the input stream.
- */
- @Override
- public synchronized void close() throws IOException {
- try {
- if (in != null) {
- in.close();
- }
- } finally {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- }
- }
- }
-
- private void validateLength(int maxLineLength) {
- if (maxLineLength <= 0)
- throw new IllegalArgumentException(
- "maxLineLength must be a positive value");
- }
-
- private boolean isCompressedInput() {
- return (codec != null);
- }
-
- private int maxBytesToConsume(long pos) {
- return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(
- Integer.MAX_VALUE, end - pos);
- }
-
- private long getFilePosition() throws IOException {
- long retVal;
- if (isCompressedInput() && null != filePosition) {
- retVal = filePosition.getPos();
- } else {
- retVal = pos;
- }
- return retVal;
- }
-} // class ChunkRecordReader
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
deleted file mode 100644
index a0a8b17..0000000
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import java.io.DataOutput;
-import java.io.DataInput;
-import java.lang.UnsupportedOperationException;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Just an output buffer for the ChunkRecordReader. It must extend Writable
- * otherwise it will not fit into the next() interface method
- */
-public class ChunkWritable implements Writable {
- public byte [] box;
-
- /**
- * Serializes the fields of this object to <code>out</code>.
- *
- * @param out <code>DataOutput</code> to serialize this object into.
- * @throws UnsupportedOperationException this function is not supported
- */
- @Override
- public void write(DataOutput out) {
- throw new UnsupportedOperationException("ChunkWritable.write() is not implemented");
- }
-
- /**
- * Deserializes the fields of this object from <code>in</code>.
- * <p>For efficiency, implementations should attempt to re-use storage in the
- * existing object where possible.</p>
- *
- * @param in <code>DataInput</code> to deserialize this object from.
- * @throws UnsupportedOperationException this function is not supported
- */
- @Override
- public void readFields(DataInput in) {
- throw new UnsupportedOperationException("ChunkWritable.readFields() is not implemented");
- }
-}
\ No newline at end of file