You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2015/10/28 23:09:53 UTC
[03/15] incubator-hawq git commit: HAWQ-45. PXF Package Namespace
change
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeInputBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeInputBuilder.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeInputBuilder.java
new file mode 100644
index 0000000..126709f
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeInputBuilder.java
@@ -0,0 +1,51 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OutputFormat;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.Text;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInput;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+public class BridgeInputBuilder {
+ private ProtocolData protocolData;
+ private static final Log LOG = LogFactory.getLog(BridgeInputBuilder.class);
+
+ public BridgeInputBuilder(ProtocolData protocolData) throws Exception {
+ this.protocolData = protocolData;
+ }
+
+ public List<OneField> makeInput(DataInput inputStream) throws Exception {
+ if (protocolData.outputFormat() == OutputFormat.TEXT) {
+ Text txt = new Text();
+ txt.readFields(inputStream);
+ return Collections.singletonList(new OneField(DataType.BYTEA.getOID(), txt.getBytes()));
+ }
+
+ GPDBWritable gpdbWritable = new GPDBWritable();
+ gpdbWritable.readFields(inputStream);
+
+ if (gpdbWritable.isEmpty()) {
+ LOG.debug("Reached end of stream");
+ return null;
+ }
+
+ GPDBWritableMapper mapper = new GPDBWritableMapper(gpdbWritable);
+ int[] colTypes = gpdbWritable.getColType();
+ List<OneField> record = new LinkedList<OneField>();
+ for (int i = 0; i < colTypes.length; i++) {
+ mapper.setDataType(colTypes[i]);
+ record.add(new OneField(colTypes[i], mapper.getData(i)));
+ }
+
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
new file mode 100644
index 0000000..99255fa
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
@@ -0,0 +1,288 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OutputFormat;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.BufferWritable;
+import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException;
+import org.apache.hawq.pxf.service.io.Text;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.lang.ObjectUtils;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hawq.pxf.api.io.DataType.TEXT;
+
+/**
+ * Class creates the output record that is piped by the java process to the HAWQ
+ * backend. Actually, the output record is serialized and the obtained byte
+ * string is piped to the HAWQ segment. The output record will implement
+ * Writable, and the mission of BridgeOutputBuilder will be to translate a list
+ * of {@link OneField} objects (obtained from the Resolver) into an output
+ * record.
+ */
+public class BridgeOutputBuilder {
+ private ProtocolData inputData;
+ private Writable output = null;
+ private GPDBWritable errorRecord = null;
+ private int[] schema;
+ private String[] colNames;
+
+ /**
+ * Constructs a BridgeOutputBuilder.
+ *
+ * @param input input data, like requested output format and schema
+ * information
+ */
+ public BridgeOutputBuilder(ProtocolData input) {
+ inputData = input;
+ makeErrorRecord();
+ }
+
+ /**
+ * We need a separate GPDBWritable record to represent the error record.
+ * Just setting the errorFlag on the "output" GPDBWritable variable is not
+ * good enough, since the GPDBWritable is built only after the first record
+ * is read from the file. And if we encounter an error while fetching the
+ * first record from the file, then the output member will be null. The
+ * reason we cannot count on the schema to build the GPDBWritable output
+ * variable before reading the first record, is because the schema does not
+ * account for arrays - we cannot know from the schema the length of an
+ * array. We find out only after fetching the first record.
+ */
+ void makeErrorRecord() {
+ int[] errSchema = { TEXT.getOID() };
+
+ if (inputData.outputFormat() != OutputFormat.BINARY) {
+ return;
+ }
+
+ errorRecord = new GPDBWritable(errSchema);
+ errorRecord.setError(true);
+ }
+
+ /**
+ * Returns the error record. If the output format is not binary, error
+ * records are not supported, and the given exception will be thrown
+ *
+ * @param ex exception to be stored in record
+ * @return error record
+ * @throws Exception if the output format is not binary
+ */
+ public Writable getErrorOutput(Exception ex) throws Exception {
+ if (inputData.outputFormat() == OutputFormat.BINARY) {
+ errorRecord.setString(0, ex.getMessage());
+ return errorRecord;
+ } else {
+ throw ex;
+ }
+ }
+
+ /**
+ * Translates recFields (obtained from the Resolver) into an output record.
+ *
+ * @param recFields record fields to be serialized
+ * @return Writable object with serialized row
+ * @throws BadRecordException if building the output record failed
+ */
+ public Writable makeOutput(List<OneField> recFields)
+ throws BadRecordException {
+ if (output == null && inputData.outputFormat() == OutputFormat.BINARY) {
+ makeGPDBWritableOutput();
+ }
+
+ fillOutputRecord(recFields);
+
+ return output;
+ }
+
+ /**
+ * Creates the GPDBWritable object. The object is created one time and is
+ * refilled from recFields for each record sent
+ *
+ * @return empty GPDBWritable object with set columns
+ */
+ GPDBWritable makeGPDBWritableOutput() {
+ int num_actual_fields = inputData.getColumns();
+ schema = new int[num_actual_fields];
+ colNames = new String[num_actual_fields];
+
+ for (int i = 0; i < num_actual_fields; i++) {
+ schema[i] = inputData.getColumn(i).columnTypeCode();
+ colNames[i] = inputData.getColumn(i).columnName();
+ }
+
+ output = new GPDBWritable(schema);
+
+ return (GPDBWritable) output;
+ }
+
+ /**
+ * Fills the output record based on the fields in recFields.
+ *
+ * @param recFields record fields
+ * @throws BadRecordException if building the output record failed
+ */
+ void fillOutputRecord(List<OneField> recFields) throws BadRecordException {
+ if (inputData.outputFormat() == OutputFormat.BINARY) {
+ fillGPDBWritable(recFields);
+ } else {
+ fillText(recFields);
+ }
+ }
+
+ /**
+ * Fills a GPDBWritable object based on recFields. The input record
+ * recFields must correspond to schema. If the record has more or less
+ * fields than the schema we throw an exception. We require that the type of
+ * field[i] in recFields corresponds to the type of field[i] in the schema.
+ *
+ * @param recFields record fields
+ * @throws BadRecordException if building the output record failed
+ */
+ void fillGPDBWritable(List<OneField> recFields) throws BadRecordException {
+ int size = recFields.size();
+ if (size == 0) { // size 0 means the resolver couldn't deserialize any
+ // of the record fields
+ throw new BadRecordException("No fields in record");
+ } else if (size != schema.length) {
+ throw new BadRecordException("Record has " + size
+ + " fields but the schema size is " + schema.length);
+ }
+
+ for (int i = 0; i < size; i++) {
+ OneField current = recFields.get(i);
+ if (!isTypeInSchema(current.type, schema[i])) {
+ throw new BadRecordException("For field " + colNames[i]
+ + " schema requires type "
+ + DataType.get(schema[i]).toString()
+ + " but input record has type "
+ + DataType.get(current.type).toString());
+ }
+
+ fillOneGPDBWritableField(current, i);
+ }
+ }
+
+ /**
+ * Tests if data type is a string type. String type is a type that can be
+ * serialized as string, such as varchar, bpchar, text, numeric, timestamp,
+ * date.
+ *
+ * @param type data type
+ * @return whether data type is string type
+ */
+ boolean isStringType(DataType type) {
+ return Arrays.asList(DataType.VARCHAR, DataType.BPCHAR, DataType.TEXT,
+ DataType.NUMERIC, DataType.TIMESTAMP, DataType.DATE).contains(
+ type);
+ }
+
+ /**
+ * Tests if record field type and schema type correspond.
+ *
+ * @param recType record type code
+ * @param schemaType schema type code
+ * @return whether record type and schema type match
+ */
+ boolean isTypeInSchema(int recType, int schemaType) {
+ DataType dtRec = DataType.get(recType);
+ DataType dtSchema = DataType.get(schemaType);
+
+ return (dtSchema == DataType.UNSUPPORTED_TYPE || dtRec == dtSchema || (isStringType(dtRec) && isStringType(dtSchema)));
+ }
+
+ /**
+ * Fills a Text object based on recFields.
+ *
+ * @param recFields record fields
+ * @throws BadRecordException if text formatted record has more than one field
+ */
+ void fillText(List<OneField> recFields) throws BadRecordException {
+ /*
+ * For the TEXT case there must be only one record in the list
+ */
+ if (recFields.size() != 1) {
+ throw new BadRecordException(
+ "BridgeOutputBuilder must receive one field when handling the TEXT format");
+ }
+
+ OneField fld = recFields.get(0);
+ int type = fld.type;
+ Object val = fld.val;
+ if (DataType.get(type) == DataType.BYTEA) {// from LineBreakAccessor
+ output = new BufferWritable((byte[]) val);
+ } else { // from QuotedLineBreakAccessor
+ String textRec = (String) val;
+ output = new Text(textRec + "\n");
+ }
+ }
+
+ /**
+ * Fills one GPDBWritable field.
+ *
+ * @param oneField field
+ * @param colIdx column index
+ * @throws BadRecordException if field type is not supported or doesn't match the schema
+ */
+ void fillOneGPDBWritableField(OneField oneField, int colIdx)
+ throws BadRecordException {
+ int type = oneField.type;
+ Object val = oneField.val;
+ GPDBWritable GPDBoutput = (GPDBWritable) output;
+ try {
+ switch (DataType.get(type)) {
+ case INTEGER:
+ GPDBoutput.setInt(colIdx, (Integer) val);
+ break;
+ case FLOAT8:
+ GPDBoutput.setDouble(colIdx, (Double) val);
+ break;
+ case REAL:
+ GPDBoutput.setFloat(colIdx, (Float) val);
+ break;
+ case BIGINT:
+ GPDBoutput.setLong(colIdx, (Long) val);
+ break;
+ case SMALLINT:
+ GPDBoutput.setShort(colIdx, (Short) val);
+ break;
+ case BOOLEAN:
+ GPDBoutput.setBoolean(colIdx, (Boolean) val);
+ break;
+ case BYTEA:
+ byte[] bts = null;
+ if (val != null) {
+ int length = Array.getLength(val);
+ bts = new byte[length];
+ for (int j = 0; j < length; j++) {
+ bts[j] = Array.getByte(val, j);
+ }
+ }
+ GPDBoutput.setBytes(colIdx, bts);
+ break;
+ case VARCHAR:
+ case BPCHAR:
+ case CHAR:
+ case TEXT:
+ case NUMERIC:
+ case TIMESTAMP:
+ case DATE:
+ GPDBoutput.setString(colIdx, ObjectUtils.toString(val, null));
+ break;
+ default:
+ String valClassName = (val != null) ? val.getClass().getSimpleName()
+ : null;
+ throw new UnsupportedOperationException(valClassName
+ + " is not supported for HAWQ conversion");
+ }
+ } catch (TypeMismatchException e) {
+ throw new BadRecordException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmenterFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmenterFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmenterFactory.java
new file mode 100644
index 0000000..1ea2f86
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmenterFactory.java
@@ -0,0 +1,17 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.service.utilities.Utilities;
+
+/**
+ * Factory class for creation of {@link Fragmenter} objects. The actual {@link Fragmenter} object is "hidden" behind
+ * an {@link Fragmenter} abstract class which is returned by the FragmenterFactory.
+ */
+public class FragmenterFactory {
+ static public Fragmenter create(InputData inputData) throws Exception {
+ String fragmenterName = inputData.getFragmenter();
+
+ return (Fragmenter) Utilities.createAnyInstance(InputData.class, fragmenterName, inputData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java
new file mode 100644
index 0000000..47f883c
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java
@@ -0,0 +1,63 @@
+package org.apache.hawq.pxf.service;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.apache.hawq.pxf.api.Fragment;
+
+/**
+ * Class for serializing fragments metadata in JSON format.
+ * The class implements {@link StreamingOutput} so the serialization will be
+ * done in a stream and not in one bulk, this in order to avoid running
+ * out of memory when processing a lot of fragments.
+ */
+public class FragmentsResponse implements StreamingOutput {
+
+ private static Log Log = LogFactory.getLog(FragmentsResponse.class);
+
+ private List<Fragment> fragments;
+
+ /**
+ * Constructs fragments response out of a list of fragments
+ *
+ * @param fragments fragment list
+ */
+ public FragmentsResponse(List<Fragment> fragments) {
+ this.fragments = fragments;
+ }
+
+ /**
+ * Serializes a fragments list in JSON,
+ * To be used as the result string for HAWQ.
+ * An example result is as follows:
+ * {@code {"PXFFragments":[{"replicas":["sdw1.corp.emc.com","sdw3.corp.emc.com","sdw8.corp.emc.com"],"sourceName":"text2.csv", "index":"0", "metadata":<base64 metadata for fragment>, "userData":"<data_specific_to_third_party_fragmenter>"},{"replicas":["sdw2.corp.emc.com","sdw4.corp.emc.com","sdw5.corp.emc.com"],"sourceName":"text_data.csv","index":"0","metadata":<base64 metadata for fragment>,"userData":"<data_specific_to_third_party_fragmenter>"}]}}
+ */
+ @Override
+ public void write(OutputStream output) throws IOException,
+ WebApplicationException {
+ DataOutputStream dos = new DataOutputStream(output);
+ ObjectMapper mapper = new ObjectMapper();
+
+ dos.write("{\"PXFFragments\":[".getBytes());
+
+ String prefix = "";
+ for (Fragment fragment : fragments) {
+ StringBuilder result = new StringBuilder();
+ /* metaData and userData are automatically converted to Base64 */
+ result.append(prefix).append(mapper.writeValueAsString(fragment));
+ prefix = ",";
+ dos.write(result.toString().getBytes());
+ }
+
+ dos.write("]}".getBytes());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
new file mode 100644
index 0000000..0e9c47f
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
@@ -0,0 +1,135 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Utility class for converting Fragments into a {@link FragmentsResponse}
+ * that will serialize them into JSON format.
+ */
+public class FragmentsResponseFormatter {
+
+ private static Log LOG = LogFactory.getLog(FragmentsResponseFormatter.class);
+
+ /**
+ * Converts Fragments list to FragmentsResponse
+ * after replacing host name by their respective IPs.
+ *
+ * @param fragments list of fragments
+ * @param data data (e.g. path) related to the fragments
+ * @return FragmentsResponse with given fragments
+ * @throws UnknownHostException if converting host names to IP fails
+ */
+ public static FragmentsResponse formatResponse(List<Fragment> fragments, String data) throws UnknownHostException {
+ /* print the raw fragment list to log when in debug level */
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fragments before conversion to IP list:");
+ FragmentsResponseFormatter.printList(fragments, data);
+ }
+
+ /* HD-2550: convert host names to IPs */
+ convertHostsToIPs(fragments);
+
+ updateFragmentIndex(fragments);
+
+ /* print the fragment list to log when in debug level */
+ if (LOG.isDebugEnabled()) {
+ FragmentsResponseFormatter.printList(fragments, data);
+ }
+
+ return new FragmentsResponse(fragments);
+ }
+
+ /**
+ * Updates the fragments' indexes so that it is incremented by sourceName.
+ * (E.g.: {"a", 0}, {"a", 1}, {"b", 0} ... )
+ *
+ * @param fragments fragments to be updated
+ */
+ private static void updateFragmentIndex(List<Fragment> fragments) {
+
+ String sourceName = null;
+ int index = 0;
+ for (Fragment fragment : fragments) {
+
+ String currentSourceName = fragment.getSourceName();
+ if (!currentSourceName.equals(sourceName)) {
+ index = 0;
+ sourceName = currentSourceName;
+ }
+ fragment.setIndex(index++);
+ }
+ }
+
+ /**
+ * Converts hosts to their matching IP addresses.
+ *
+ * @throws UnknownHostException if converting host name to IP fails
+ */
+ private static void convertHostsToIPs(List<Fragment> fragments) throws UnknownHostException {
+ /* host converted to IP map. Used to limit network calls. */
+ HashMap<String, String> hostToIpMap = new HashMap<String, String>();
+
+ for (Fragment fragment : fragments) {
+ String[] hosts = fragment.getReplicas();
+ if (hosts == null) {
+ continue;
+ }
+ String[] ips = new String[hosts.length];
+ int index = 0;
+
+ for (String host : hosts) {
+ String convertedIp = hostToIpMap.get(host);
+ if (convertedIp == null) {
+ /* find host's IP, and add to map */
+ InetAddress addr = InetAddress.getByName(host);
+ convertedIp = addr.getHostAddress();
+ hostToIpMap.put(host, convertedIp);
+ }
+
+ /* update IPs array */
+ ips[index] = convertedIp;
+ ++index;
+ }
+ fragment.setReplicas(ips);
+ }
+ }
+
+ /*
+ * Converts a fragments list to a readable string and prints it to the log.
+ * Intended for debugging purposes only.
+ * 'datapath' is the data path part of the original URI (e.g., table name, *.csv, etc).
+ */
+ private static void printList(List<Fragment> fragments, String datapath) {
+ LOG.debug("List of " +
+ (fragments.isEmpty() ? "no" : fragments.size()) + "fragments for \"" +
+ datapath + "\"");
+
+ int i = 0;
+ for (Fragment fragment : fragments) {
+ StringBuilder result = new StringBuilder();
+ result.append("Fragment #").append(++i).append(": [")
+ .append("Source: ").append(fragment.getSourceName())
+ .append(", Index: ").append(fragment.getIndex())
+ .append(", Replicas:");
+ for (String host : fragment.getReplicas()) {
+ result.append(" ").append(host);
+ }
+
+ result.append(", Metadata: ").append(new String(fragment.getMetadata()));
+
+ if (fragment.getUserData() != null) {
+ result.append(", User Data: ").append(new String(fragment.getUserData()));
+ }
+ result.append("] ");
+ LOG.debug(result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/GPDBWritableMapper.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/GPDBWritableMapper.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/GPDBWritableMapper.java
new file mode 100644
index 0000000..7615e54
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/GPDBWritableMapper.java
@@ -0,0 +1,115 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException;
+
+/*
+ * Class for mapping GPDBWritable get functions to java types.
+ */
+public class GPDBWritableMapper {
+
+ private GPDBWritable gpdbWritable;
+ private int type;
+ private DataGetter getter = null;
+
+ public GPDBWritableMapper(GPDBWritable gpdbWritable) {
+ this.gpdbWritable = gpdbWritable;
+ }
+
+ public void setDataType(int type) throws UnsupportedTypeException {
+ this.type = type;
+
+ switch (DataType.get(type)) {
+ case BOOLEAN:
+ getter = new BooleanDataGetter();
+ break;
+ case BYTEA:
+ getter = new BytesDataGetter();
+ break;
+ case BIGINT:
+ getter = new LongDataGetter();
+ break;
+ case SMALLINT:
+ getter = new ShortDataGetter();
+ break;
+ case INTEGER:
+ getter = new IntDataGetter();
+ break;
+ case TEXT:
+ getter = new StringDataGetter();
+ break;
+ case REAL:
+ getter = new FloatDataGetter();
+ break;
+ case FLOAT8:
+ getter = new DoubleDataGetter();
+ break;
+ default:
+ throw new UnsupportedTypeException(
+ "Type " + GPDBWritable.getTypeName(type) +
+ " is not supported by GPDBWritable");
+ }
+ }
+
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return getter.getData(colIdx);
+ }
+
+ private interface DataGetter {
+ abstract Object getData(int colIdx) throws TypeMismatchException;
+ }
+
+ private class BooleanDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getBoolean(colIdx);
+ }
+ }
+
+ private class BytesDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getBytes(colIdx);
+ }
+ }
+
+ private class DoubleDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getDouble(colIdx);
+ }
+ }
+
+ private class FloatDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getFloat(colIdx);
+ }
+ }
+
+ private class IntDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getInt(colIdx);
+ }
+ }
+
+ private class LongDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getLong(colIdx);
+ }
+ }
+
+ private class ShortDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getShort(colIdx);
+ }
+ }
+
+ private class StringDataGetter implements DataGetter {
+ public Object getData(int colIdx) throws TypeMismatchException {
+ return gpdbWritable.getString(colIdx);
+ }
+ }
+
+ public String toString() {
+ return "getter type = " + GPDBWritable.getTypeName(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataFetcherFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataFetcherFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataFetcherFactory.java
new file mode 100644
index 0000000..cfd1105
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataFetcherFactory.java
@@ -0,0 +1,14 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hawq.pxf.api.MetadataFetcher;
+
+/**
+ * Factory class for creation of {@link MetadataFetcher} objects.
+ * The actual {@link MetadataFetcher} object is "hidden" behind an {@link MetadataFetcher}
+ * abstract class which is returned by the MetadataFetcherFactory.
+ */
+public class MetadataFetcherFactory {
+ static public MetadataFetcher create(String fetcherName) throws Exception {
+ return (MetadataFetcher) Class.forName(fetcherName).newInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
new file mode 100644
index 0000000..6f33f2a
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
@@ -0,0 +1,86 @@
+package org.apache.hawq.pxf.service;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+import org.apache.hawq.pxf.api.Metadata;
+
+/**
+ * Utility class for converting {@link Metadata} into a JSON format.
+ */
+public class MetadataResponseFormatter {
+
+ private static Log Log = LogFactory.getLog(MetadataResponseFormatter.class);
+
+ /**
+ * Converts {@link Metadata} to JSON String format.
+ *
+ * @param metadata metadata to convert
+ * @return JSON formatted response
+ * @throws IOException if converting the data to JSON fails
+ */
+ public static String formatResponseString(Metadata metadata) throws IOException {
+ /* print the metadata before serialization */
+ Log.debug(MetadataResponseFormatter.metadataToString(metadata));
+
+ return MetadataResponseFormatter.metadataToJSON(metadata);
+ }
+
+ /**
+ * Serializes a metadata in JSON,
+ * To be used as the result string for HAWQ.
+ * An example result is as follows:
+ *
+ * {"PXFMetadata":[{"table":{"dbName":"default","tableName":"t1"},"fields":[{"name":"a","type":"int"},{"name":"b","type":"float"}]}]}
+ */
+ private static String metadataToJSON(Metadata metadata) throws IOException {
+
+ if (metadata == null) {
+ throw new IllegalArgumentException("metadata object is null - cannot serialize");
+ }
+
+ if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) {
+ throw new IllegalArgumentException("metadata contains no fields - cannot serialize");
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.setSerializationInclusion(Inclusion.NON_EMPTY); // ignore empty fields
+
+ StringBuilder result = new StringBuilder("{\"PXFMetadata\":");
+ String prefix = "["; // preparation for supporting multiple tables
+ result.append(prefix).append(mapper.writeValueAsString(metadata));
+ return result.append("]}").toString();
+ }
+
+ /**
+ * Converts metadata to a readable string.
+ * Intended for debugging purposes only.
+ */
+ private static String metadataToString(Metadata metadata) {
+ StringBuilder result = new StringBuilder("Metadata for table \"");
+
+ if (metadata == null) {
+ return "No metadata";
+ }
+
+ result.append(metadata.getTable()).append("\": ");
+
+ if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) {
+ result.append("no fields in table");
+ return result.toString();
+ }
+
+ int i = 0;
+ for (Metadata.Field field: metadata.getFields()) {
+ result.append("Field #").append(++i).append(": [")
+ .append("Name: ").append(field.getName())
+ .append(", Type: ").append(field.getType()).append("] ");
+ }
+
+ return result.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
new file mode 100644
index 0000000..9497c6c
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
@@ -0,0 +1,128 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.hawq.pxf.service.utilities.Utilities;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.nio.charset.CharacterCodingException;
+import java.util.zip.ZipException;
+
+/*
+ * ReadBridge class creates appropriate accessor and resolver.
+ * It will then create the correct output conversion
+ * class (e.g. Text or GPDBWritable) and get records from accessor,
+ * let resolver deserialize them and reserialize them using the
+ * output conversion class.
+ *
+ * The class handles BadRecordException and other exception type
+ * and marks the record as invalid for GPDB.
+ */
+public class ReadBridge implements Bridge {
+ ReadAccessor fileAccessor = null;
+ ReadResolver fieldsResolver = null;
+ BridgeOutputBuilder outputBuilder = null;
+
+ private Log Log;
+
+ /*
+ * C'tor - set the implementation of the bridge
+ */
+ public ReadBridge(ProtocolData protData) throws Exception {
+ outputBuilder = new BridgeOutputBuilder(protData);
+ Log = LogFactory.getLog(ReadBridge.class);
+ fileAccessor = getFileAccessor(protData);
+ fieldsResolver = getFieldsResolver(protData);
+ }
+
+ /*
+ * Accesses the underlying HDFS file
+ */
+ @Override
+ public boolean beginIteration() throws Exception {
+ return fileAccessor.openForRead();
+ }
+
+ /*
+ * Fetch next object from file and turn it into a record that the GPDB backend can process
+ */
+ @Override
+ public Writable getNext() throws Exception {
+ Writable output;
+ OneRow onerow = null;
+ try {
+ onerow = fileAccessor.readNextObject();
+ if (onerow == null) {
+ fileAccessor.closeForRead();
+ return null;
+ }
+
+ output = outputBuilder.makeOutput(fieldsResolver.getFields(onerow));
+ } catch (IOException ex) {
+ if (!isDataException(ex)) {
+ fileAccessor.closeForRead();
+ throw ex;
+ }
+ output = outputBuilder.getErrorOutput(ex);
+ } catch (BadRecordException ex) {
+ String row_info = "null";
+ if (onerow != null) {
+ row_info = onerow.toString();
+ }
+ if (ex.getCause() != null) {
+ Log.debug("BadRecordException " + ex.getCause().toString() + ": " + row_info);
+ } else {
+ Log.debug(ex.toString() + ": " + row_info);
+ }
+ output = outputBuilder.getErrorOutput(ex);
+ } catch (Exception ex) {
+ fileAccessor.closeForRead();
+ throw ex;
+ }
+
+ return output;
+ }
+
+ public static ReadAccessor getFileAccessor(InputData inputData) throws Exception {
+ return (ReadAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData);
+ }
+
+ public static ReadResolver getFieldsResolver(InputData inputData) throws Exception {
+ return (ReadResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData);
+ }
+
+ /*
+ * There are many exceptions that inherit IOException. Some of them like EOFException are generated
+ * due to a data problem, and not because of an IO/connection problem as the father IOException
+ * might lead us to believe. For example, an EOFException will be thrown while fetching a record
+ * from a sequence file, if there is a formatting problem in the record. Fetching record from
+ * the sequence-file is the responsibility of the accessor so the exception will be thrown from the
+ * accessor. We identify this cases by analyzing the exception type, and when we discover that the
+ * actual problem was a data problem, we return the errorOutput GPDBWritable.
+ */
+ private boolean isDataException(IOException ex) {
+ return (ex instanceof EOFException || ex instanceof CharacterCodingException ||
+ ex instanceof CharConversionException || ex instanceof UTFDataFormatException ||
+ ex instanceof ZipException);
+ }
+
+ @Override
+ public boolean setNext(DataInputStream inputStream) {
+ throw new UnsupportedOperationException("setNext is not implemented");
+ }
+
+ @Override
+ public boolean isThreadSafe() {
+ boolean result = ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe();
+ Log.debug("Bridge is " + (result ? "" : "not ") + "thread safe");
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
new file mode 100644
index 0000000..34ed316
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
@@ -0,0 +1,96 @@
+package org.apache.hawq.pxf.service;
+
+import org.apache.hawq.pxf.api.*;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.hawq.pxf.service.utilities.Utilities;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInputStream;
+import java.util.List;
+
+/*
+ * WriteBridge class creates appropriate accessor and resolver.
+ * It reads data from inputStream by the resolver,
+ * and writes it to the Hadoop storage with the accessor.
+ */
+public class WriteBridge implements Bridge {
+ private static final Log LOG = LogFactory.getLog(WriteBridge.class);
+ WriteAccessor fileAccessor = null;
+ WriteResolver fieldsResolver = null;
+ BridgeInputBuilder inputBuilder;
+
+ /*
+ * C'tor - set the implementation of the bridge
+ */
+ public WriteBridge(ProtocolData protocolData) throws Exception {
+
+ inputBuilder = new BridgeInputBuilder(protocolData);
+ /* plugins accept InputData paramaters */
+ fileAccessor = getFileAccessor(protocolData);
+ fieldsResolver = getFieldsResolver(protocolData);
+
+ }
+
+ /*
+ * Accesses the underlying HDFS file
+ */
+ public boolean beginIteration() throws Exception {
+ return fileAccessor.openForWrite();
+ }
+
+ /*
+ * Read data from stream, convert it using WriteResolver into OneRow object, and
+ * pass to WriteAccessor to write into file.
+ */
+ @Override
+ public boolean setNext(DataInputStream inputStream) throws Exception {
+
+ List<OneField> record = inputBuilder.makeInput(inputStream);
+ if (record == null) {
+ close();
+ return false;
+ }
+
+ OneRow onerow = fieldsResolver.setFields(record);
+ if (onerow == null) {
+ close();
+ return false;
+ }
+ if (!fileAccessor.writeNextObject(onerow)) {
+ close();
+ throw new BadRecordException();
+ }
+ return true;
+ }
+
+ private void close() throws Exception {
+ try {
+ fileAccessor.closeForWrite();
+ } catch (Exception e) {
+ LOG.error("Failed to close bridge resources: " + e.getMessage());
+ throw e;
+ }
+ }
+
+ private static WriteAccessor getFileAccessor(InputData inputData) throws Exception {
+ return (WriteAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData);
+ }
+
+ private static WriteResolver getFieldsResolver(InputData inputData) throws Exception {
+ return (WriteResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData);
+ }
+
+ @Override
+ public Writable getNext() {
+ throw new UnsupportedOperationException("getNext is not implemented");
+ }
+
+ @Override
+ public boolean isThreadSafe() {
+ return ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java
new file mode 100644
index 0000000..e74c88b
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java
@@ -0,0 +1,57 @@
+package org.apache.hawq.pxf.service.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.UnsupportedOperationException;
+
+/**
+ * A serializable object for transporting a byte array through the Bridge
+ * framework
+ */
+public class BufferWritable implements Writable {
+
+ byte[] buf = null;
+
+ /**
+ * Constructs a BufferWritable. Copies the buffer reference and not the
+ * actual bytes. This class is used when we intend to transport a buffer
+ * through the Bridge framework without copying the data each time the
+ * buffer is passed between the Bridge objects.
+ *
+ * @param inBuf buffer
+ */
+ public BufferWritable(byte[] inBuf) {
+ buf = inBuf;
+ }
+
+ /**
+ * Serializes the fields of this object to <code>out</code>.
+ *
+ * @param out <code>DataOutput</code> to serialize this object into.
+ * @throws IOException if the buffer was not set
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (buf == null)
+ throw new IOException("BufferWritable was not set");
+ out.write(buf);
+ }
+
+ /**
+ * Deserializes the fields of this object from <code>in</code>.
+ * <p>
+ * For efficiency, implementations should attempt to re-use storage in the
+ * existing object where possible.
+ * </p>
+ *
+ * @param in <code>DataInput</code> to deserialize this object from
+ * @throws UnsupportedOperationException this function is not supported
+ */
+ @Override
+ public void readFields(DataInput in) {
+ throw new UnsupportedOperationException(
+ "BufferWritable.readFields() is not implemented");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/GPDBWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/GPDBWritable.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/GPDBWritable.java
new file mode 100644
index 0000000..1cce070
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/GPDBWritable.java
@@ -0,0 +1,873 @@
+package org.apache.hawq.pxf.service.io;
+
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.util.Arrays;
+
+import static org.apache.hawq.pxf.api.io.DataType.*;
+
+
+/**
+ * This class represents a GPDB record in the form of
+ * a Java object.
+ */
+public class GPDBWritable implements Writable {
+ /*
+ * GPDBWritable is using the following serialization form:
+ * Total Length | Version | Error Flag | # of columns | Col type |...| Col type | Null Bit array | Col val...
+ * 4 byte | 2 byte | 1 byte | 2 byte | 1 byte |...| 1 byte | ceil(# of columns/8) byte | Fixed or Var length
+ *
+ * For fixed length type, we know the length.
+ * In the col val, we align pad according to the alignment requirement of the type.
+ * For var length type, the alignment is always 4 byte.
+ * For var length type, col val is <4 byte length><payload val>
+ */
+
+ private static Log Log = LogFactory.getLog(GPDBWritable.class);
+ private static final int EOF = -1;
+
+ /*
+ * Enum of the Database type
+ */
+ private enum DBType {
+ BIGINT(8, 8),
+ BOOLEAN(1, 1),
+ FLOAT8(8, 8),
+ INTEGER(4, 4),
+ REAL(4, 4),
+ SMALLINT(2, 2),
+ BYTEA(4, -1),
+ TEXT(4, -1);
+
+ private final int typelength; // -1 means var length
+ private final int alignment;
+
+ DBType(int align, int len) {
+ this.typelength = len;
+ this.alignment = align;
+ }
+
+ public int getTypeLength() {
+ return typelength;
+ }
+
+ public boolean isVarLength() {
+ return typelength == -1;
+ }
+
+ // return the alignment requirement of the type
+ public int getAlignment() {
+ return alignment;
+ }
+ }
+
+ /*
+ * Constants
+ */
+ private static final int PREV_VERSION = 1;
+ private static final int VERSION = 2; /* for backward compatibility */
+ private static final String CHARSET = "UTF-8";
+
+ /*
+ * Local variables
+ */
+ protected int[] colType;
+ protected Object[] colValue;
+ protected int alignmentOfEightBytes = 8;
+ protected byte errorFlag = 0;
+ protected int pktlen = EOF;
+
+ public int[] getColType() {
+ return colType;
+ }
+
+ /**
+ * An exception class for column type definition and
+ * set/get value mismatch.
+ */
+ public class TypeMismatchException extends IOException {
+ public TypeMismatchException(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
+ * Empty Constructor
+ */
+ public GPDBWritable() {
+ initializeEightByteAlignment();
+ }
+
+ /**
+ * Constructor to build a db record. colType defines the schema
+ *
+ * @param columnType the table column types
+ */
+ public GPDBWritable(int[] columnType) {
+ initializeEightByteAlignment();
+ colType = columnType;
+ colValue = new Object[columnType.length];
+ }
+
+ /**
+ * Constructor to build a db record from a serialized form.
+ *
+ * @param data a record in the serialized form
+ * @throws IOException if the data is malformatted.
+ */
+ public GPDBWritable(byte[] data) throws IOException {
+ initializeEightByteAlignment();
+ ByteArrayInputStream bis = new ByteArrayInputStream(data);
+ DataInputStream dis = new DataInputStream(bis);
+
+ readFields(dis);
+ }
+
+ /*
+ * Read first 4 bytes, and verify it's a valid packet length.
+ * Upon error returns EOF.
+ */
+ private int readPktLen(DataInput in) throws IOException {
+ pktlen = EOF;
+
+ try {
+ pktlen = in.readInt();
+ } catch (EOFException e) {
+ Log.debug("Reached end of stream (EOFException)");
+ return EOF;
+ }
+ if (pktlen == EOF) {
+ Log.debug("Reached end of stream (returned -1)");
+ }
+
+ return pktlen;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ /*
+ * extract pkt len.
+ *
+ * GPSQL-1107:
+ * The DataInput might already be empty (EOF), but we can't check it beforehand.
+ * If that's the case, pktlen is updated to -1, to mark that the object is still empty.
+ * (can be checked with isEmpty()).
+ */
+ pktlen = readPktLen(in);
+ if (isEmpty()) {
+ return;
+ }
+
+ /* extract the version and col cnt */
+ int version = in.readShort();
+ int curOffset = 4 + 2;
+ int colCnt;
+
+ /* !!! Check VERSION !!! */
+ if (version != GPDBWritable.VERSION && version != GPDBWritable.PREV_VERSION) {
+ throw new IOException("Current GPDBWritable version(" +
+ GPDBWritable.VERSION + ") does not match input version(" +
+ version + ")");
+ }
+
+ if (version == GPDBWritable.VERSION) {
+ errorFlag = in.readByte();
+ curOffset += 1;
+ }
+
+ colCnt = in.readShort();
+ curOffset += 2;
+
+ /* Extract Column Type */
+ colType = new int[colCnt];
+ DBType[] coldbtype = new DBType[colCnt];
+ for (int i = 0; i < colCnt; i++) {
+ int enumType = (in.readByte());
+ curOffset += 1;
+ if (enumType == DBType.BIGINT.ordinal()) {
+ colType[i] = BIGINT.getOID();
+ coldbtype[i] = DBType.BIGINT;
+ } else if (enumType == DBType.BOOLEAN.ordinal()) {
+ colType[i] = BOOLEAN.getOID();
+ coldbtype[i] = DBType.BOOLEAN;
+ } else if (enumType == DBType.FLOAT8.ordinal()) {
+ colType[i] = FLOAT8.getOID();
+ coldbtype[i] = DBType.FLOAT8;
+ } else if (enumType == DBType.INTEGER.ordinal()) {
+ colType[i] = INTEGER.getOID();
+ coldbtype[i] = DBType.INTEGER;
+ } else if (enumType == DBType.REAL.ordinal()) {
+ colType[i] = REAL.getOID();
+ coldbtype[i] = DBType.REAL;
+ } else if (enumType == DBType.SMALLINT.ordinal()) {
+ colType[i] = SMALLINT.getOID();
+ coldbtype[i] = DBType.SMALLINT;
+ } else if (enumType == DBType.BYTEA.ordinal()) {
+ colType[i] = BYTEA.getOID();
+ coldbtype[i] = DBType.BYTEA;
+ } else if (enumType == DBType.TEXT.ordinal()) {
+ colType[i] = TEXT.getOID();
+ coldbtype[i] = DBType.TEXT;
+ } else {
+ throw new IOException("Unknown GPDBWritable.DBType ordinal value");
+ }
+ }
+
+ /* Extract null bit array */
+ byte[] nullbytes = new byte[getNullByteArraySize(colCnt)];
+ in.readFully(nullbytes);
+ curOffset += nullbytes.length;
+ boolean[] colIsNull = byteArrayToBooleanArray(nullbytes, colCnt);
+
+ /* extract column value */
+ colValue = new Object[colCnt];
+ for (int i = 0; i < colCnt; i++) {
+ if (!colIsNull[i]) {
+ /* Skip the alignment padding */
+ int skipbytes = roundUpAlignment(curOffset, coldbtype[i].getAlignment()) - curOffset;
+ for (int j = 0; j < skipbytes; j++) {
+ in.readByte();
+ }
+ curOffset += skipbytes;
+
+ /* For fixed length type, increment the offset according to type type length here.
+ * For var length type (BYTEA, TEXT), we'll read 4 byte length header and the
+ * actual payload.
+ */
+ int varcollen = -1;
+ if (coldbtype[i].isVarLength()) {
+ varcollen = in.readInt();
+ curOffset += 4 + varcollen;
+ } else {
+ curOffset += coldbtype[i].getTypeLength();
+ }
+
+ switch (DataType.get(colType[i])) {
+ case BIGINT: {
+ colValue[i] = in.readLong();
+ break;
+ }
+ case BOOLEAN: {
+ colValue[i] = in.readBoolean();
+ break;
+ }
+ case FLOAT8: {
+ colValue[i] = in.readDouble();
+ break;
+ }
+ case INTEGER: {
+ colValue[i] = in.readInt();
+ break;
+ }
+ case REAL: {
+ colValue[i] = in.readFloat();
+ break;
+ }
+ case SMALLINT: {
+ colValue[i] = in.readShort();
+ break;
+ }
+
+ /* For BYTEA column, it has a 4 byte var length header. */
+ case BYTEA: {
+ colValue[i] = new byte[varcollen];
+ in.readFully((byte[]) colValue[i]);
+ break;
+ }
+ /* For text formatted column, it has a 4 byte var length header
+ * and it's always null terminated string.
+ * So, we can remove the last "\0" when constructing the string.
+ */
+ case TEXT: {
+ byte[] data = new byte[varcollen];
+ in.readFully(data, 0, varcollen);
+ colValue[i] = new String(data, 0, varcollen - 1, CHARSET);
+ break;
+ }
+
+ default:
+ throw new IOException("Unknown GPDBWritable ColType");
+ }
+ }
+ }
+
+ /* Skip the ending alignment padding */
+ int skipbytes = roundUpAlignment(curOffset, 8) - curOffset;
+ for (int j = 0; j < skipbytes; j++) {
+ in.readByte();
+ }
+ curOffset += skipbytes;
+
+ if (errorFlag != 0) {
+ throw new IOException("Received error value " + errorFlag + " from format");
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ int numCol = colType.length;
+ boolean[] nullBits = new boolean[numCol];
+ int[] colLength = new int[numCol];
+ byte[] enumType = new byte[numCol];
+ int[] padLength = new int[numCol];
+ byte[] padbytes = new byte[8];
+
+ /**
+ * Compute the total payload and header length
+ * header = total length (4 byte), Version (2 byte), Error (1 byte), #col (2 byte)
+ * col type array = #col * 1 byte
+ * null bit array = ceil(#col/8)
+ */
+ int datlen = 4 + 2 + 1 + 2;
+ datlen += numCol;
+ datlen += getNullByteArraySize(numCol);
+
+ for (int i = 0; i < numCol; i++) {
+ /* Get the enum type */
+ DBType coldbtype;
+ switch (DataType.get(colType[i])) {
+ case BIGINT:
+ coldbtype = DBType.BIGINT;
+ break;
+ case BOOLEAN:
+ coldbtype = DBType.BOOLEAN;
+ break;
+ case FLOAT8:
+ coldbtype = DBType.FLOAT8;
+ break;
+ case INTEGER:
+ coldbtype = DBType.INTEGER;
+ break;
+ case REAL:
+ coldbtype = DBType.REAL;
+ break;
+ case SMALLINT:
+ coldbtype = DBType.SMALLINT;
+ break;
+ case BYTEA:
+ coldbtype = DBType.BYTEA;
+ break;
+ default:
+ coldbtype = DBType.TEXT;
+ }
+ enumType[i] = (byte) (coldbtype.ordinal());
+
+ /* Get the actual value, and set the null bit */
+ if (colValue[i] == null) {
+ nullBits[i] = true;
+ colLength[i] = 0;
+ } else {
+ nullBits[i] = false;
+
+ /*
+ * For fixed length type, we get the fixed length.
+ * For var len binary format, the length is in the col value.
+ * For text format, we must convert encoding first.
+ */
+ if (!coldbtype.isVarLength()) {
+ colLength[i] = coldbtype.getTypeLength();
+ } else if (!isTextForm(colType[i])) {
+ colLength[i] = ((byte[]) colValue[i]).length;
+ } else {
+ colLength[i] = ((String) colValue[i]).getBytes(CHARSET).length;
+ }
+
+ /* calculate and add the type alignment padding */
+ padLength[i] = roundUpAlignment(datlen, coldbtype.getAlignment()) - datlen;
+ datlen += padLength[i];
+
+ /* for variable length type, we add a 4 byte length header */
+ if (coldbtype.isVarLength()) {
+ datlen += 4;
+ }
+ }
+ datlen += colLength[i];
+ }
+
+ /*
+ * Add the final alignment padding for the next record
+ */
+ int endpadding = roundUpAlignment(datlen, 8) - datlen;
+ datlen += endpadding;
+
+ /* Construct the packet header */
+ out.writeInt(datlen);
+ out.writeShort(VERSION);
+ out.writeByte(errorFlag);
+ out.writeShort(numCol);
+
+ /* Write col type */
+ for (int i = 0; i < numCol; i++) {
+ out.writeByte(enumType[i]);
+ }
+
+ /* Nullness */
+ byte[] nullBytes = boolArrayToByteArray(nullBits);
+ out.write(nullBytes);
+
+ /* Column Value */
+ for (int i = 0; i < numCol; i++) {
+ if (!nullBits[i]) {
+ /* Pad the alignment byte first */
+ if (padLength[i] > 0) {
+ out.write(padbytes, 0, padLength[i]);
+ }
+
+ /* Now, write the actual column value */
+ switch (DataType.get(colType[i])) {
+ case BIGINT:
+ out.writeLong(((Long) colValue[i]));
+ break;
+ case BOOLEAN:
+ out.writeBoolean(((Boolean) colValue[i]));
+ break;
+ case FLOAT8:
+ out.writeDouble(((Double) colValue[i]));
+ break;
+ case INTEGER:
+ out.writeInt(((Integer) colValue[i]));
+ break;
+ case REAL:
+ out.writeFloat(((Float) colValue[i]));
+ break;
+ case SMALLINT:
+ out.writeShort(((Short) colValue[i]));
+ break;
+
+ /* For BYTEA format, add 4byte length header at the beginning */
+ case BYTEA:
+ out.writeInt(colLength[i]);
+ out.write((byte[]) colValue[i]);
+ break;
+
+ /* For text format, add 4byte length header. string is already '\0' terminated */
+ default: {
+ out.writeInt(colLength[i]);
+ byte[] data = ((String) colValue[i]).getBytes(CHARSET);
+ out.write(data);
+ break;
+ }
+ }
+ }
+ }
+
+ /* End padding */
+ out.write(padbytes, 0, endpadding);
+ }
+
+ /**
+ * Private helper to convert boolean array to byte array
+ */
+ private static byte[] boolArrayToByteArray(boolean[] data) {
+ int len = data.length;
+ byte[] byts = new byte[getNullByteArraySize(len)];
+
+ for (int i = 0, j = 0, k = 7; i < data.length; i++) {
+ byts[j] |= (data[i] ? 1 : 0) << k--;
+ if (k < 0) {
+ j++;
+ k = 7;
+ }
+ }
+ return byts;
+ }
+
+ /**
+ * Private helper to determine the size of the null byte array
+ */
+ private static int getNullByteArraySize(int colCnt) {
+ return (colCnt / 8) + (colCnt % 8 != 0 ? 1 : 0);
+ }
+
+ /**
+ * Private helper to convert byte array to boolean array
+ */
+ private static boolean[] byteArrayToBooleanArray(byte[] data, int colCnt) {
+ boolean[] bools = new boolean[colCnt];
+ for (int i = 0, j = 0, k = 7; i < bools.length; i++) {
+ bools[i] = ((data[j] >> k--) & 0x01) == 1;
+ if (k < 0) {
+ j++;
+ k = 7;
+ }
+ }
+ return bools;
+ }
+
+ /**
+ * Private helper to round up alignment for the given length
+ */
+ private int roundUpAlignment(int len, int align) {
+ int commonAlignment = align;
+ if (commonAlignment == 8) {
+ commonAlignment = alignmentOfEightBytes;
+ }
+ return (((len) + ((commonAlignment) - 1)) & ~((commonAlignment) - 1));
+ }
+
+ /**
+ * Getter/Setter methods to get/set the column value
+ */
+
+ /**
+ * Sets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @param val the value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public void setLong(int colIdx, Long val)
+ throws TypeMismatchException {
+ checkType(BIGINT, colIdx, true);
+ colValue[colIdx] = val;
+ }
+
+ /**
+ * Sets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @param val the value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public void setBoolean(int colIdx, Boolean val)
+ throws TypeMismatchException {
+ checkType(BOOLEAN, colIdx, true);
+ colValue[colIdx] = val;
+ }
+
+ /**
+ * Sets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @param val the value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public void setBytes(int colIdx, byte[] val)
+ throws TypeMismatchException {
+ checkType(BYTEA, colIdx, true);
+ colValue[colIdx] = val;
+ }
+
+ /**
+ * Sets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @param val the value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public void setString(int colIdx, String val)
+ throws TypeMismatchException {
+ checkType(TEXT, colIdx, true);
+ if (val != null) {
+ colValue[colIdx] = val + "\0";
+ } else {
+ colValue[colIdx] = val;
+ }
+ }
+
+ /**
+ * Sets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @param val the value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public void setFloat(int colIdx, Float val)
+ throws TypeMismatchException {
+ checkType(REAL, colIdx, true);
+ colValue[colIdx] = val;
+ }
+
+ /**
+ * Sets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @param val the value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public void setDouble(int colIdx, Double val)
+ throws TypeMismatchException {
+ checkType(FLOAT8, colIdx, true);
+ colValue[colIdx] = val;
+ }
+
+ /**
+ * Sets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @param val the value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public void setInt(int colIdx, Integer val)
+ throws TypeMismatchException {
+ checkType(INTEGER, colIdx, true);
+ colValue[colIdx] = val;
+ }
+
+ /**
+ * Sets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @param val the value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public void setShort(int colIdx, Short val)
+ throws TypeMismatchException {
+ checkType(SMALLINT, colIdx, true);
+ colValue[colIdx] = val;
+ }
+
+ /**
+ * Gets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @return column value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public Long getLong(int colIdx)
+ throws TypeMismatchException {
+ checkType(BIGINT, colIdx, false);
+ return (Long) colValue[colIdx];
+ }
+
+ /**
+ * Gets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @return column value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public Boolean getBoolean(int colIdx)
+ throws TypeMismatchException {
+ checkType(BOOLEAN, colIdx, false);
+ return (Boolean) colValue[colIdx];
+ }
+
+ /**
+ * Gets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @return column value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public byte[] getBytes(int colIdx)
+ throws TypeMismatchException {
+ checkType(BYTEA, colIdx, false);
+ return (byte[]) colValue[colIdx];
+ }
+
+ /**
+ * Gets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @return column value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public String getString(int colIdx)
+ throws TypeMismatchException {
+ checkType(TEXT, colIdx, false);
+ return (String) colValue[colIdx];
+ }
+
+ /**
+ * Gets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @return column value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public Float getFloat(int colIdx)
+ throws TypeMismatchException {
+ checkType(REAL, colIdx, false);
+ return (Float) colValue[colIdx];
+ }
+
+ /**
+ * Gets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @return column value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public Double getDouble(int colIdx)
+ throws TypeMismatchException {
+ checkType(FLOAT8, colIdx, false);
+ return (Double) colValue[colIdx];
+ }
+
+ /**
+ * Gets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @return column value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public Integer getInt(int colIdx)
+ throws TypeMismatchException {
+ checkType(INTEGER, colIdx, false);
+ return (Integer) colValue[colIdx];
+ }
+
+ /**
+ * Gets the column value of the record.
+ *
+ * @param colIdx the column index
+ * @return column value
+ * @throws TypeMismatchException the column type does not match
+ */
+ public Short getShort(int colIdx)
+ throws TypeMismatchException {
+ checkType(SMALLINT, colIdx, false);
+ return (Short) colValue[colIdx];
+ }
+
+ /**
+ * Sets the error field.
+ *
+ * @param errorVal the error value
+ */
+ public void setError(boolean errorVal) {
+ errorFlag = errorVal ? (byte) 1 : (byte) 0;
+ }
+
+ /**
+ * Returns a string representation of the object.
+ */
+ @Override
+ public String toString() {
+ if (colType == null) {
+ return null;
+ }
+ StringBuilder result = new StringBuilder();
+ for (int i = 0; i < colType.length; i++) {
+ result.append("Column ").append(i).append(":");
+ if (colValue[i] != null) {
+ result.append(colType[i] == BYTEA.getOID()
+ ? byteArrayInString((byte[]) colValue[i])
+ : colValue[i]);
+ }
+ result.append("\n");
+ }
+ return result.toString();
+ }
+
+ /**
+ * Helper printing function
+ */
+ private static String byteArrayInString(byte[] data) {
+ StringBuilder result = new StringBuilder();
+ for (Byte b : data) {
+ result.append(b.intValue()).append(" ");
+ }
+ return result.toString();
+ }
+
+ /**
+ * Private Helper to check the type mismatch
+ * If the expected type is stored as string, then it must be set
+ * via setString.
+ * Otherwise, the type must match.
+ */
+ private void checkType(DataType inTyp, int idx, boolean isSet)
+ throws TypeMismatchException {
+ if (idx < 0 || idx >= colType.length) {
+ throw new TypeMismatchException("Column index is out of range");
+ }
+
+ int exTyp = colType[idx];
+
+ if (isTextForm(exTyp)) {
+ if (inTyp != TEXT) {
+ throw new TypeMismatchException(formErrorMsg(inTyp.getOID(), TEXT.getOID(), isSet));
+ }
+ } else if (inTyp != DataType.get(exTyp)) {
+ throw new TypeMismatchException(formErrorMsg(inTyp.getOID(), exTyp, isSet));
+ }
+ }
+
+ private String formErrorMsg(int inTyp, int colTyp, boolean isSet) {
+ return isSet
+ ? "Cannot set " + getTypeName(inTyp) + " to a " + getTypeName(colTyp) + " column"
+ : "Cannot get " + getTypeName(inTyp) + " from a " + getTypeName(colTyp) + " column";
+ }
+
+ /**
+ * Private Helper routine to tell whether a type is Text form or not
+ *
+ * @param type the type OID that we want to check
+ */
+ private boolean isTextForm(int type) {
+ return !Arrays.asList(BIGINT, BOOLEAN, BYTEA, FLOAT8, INTEGER, REAL, SMALLINT).contains(DataType.get(type));
+ }
+
+ /**
+ * Helper to get the type name.
+ * If a given oid is not in the commonly used list, we
+ * would expect a TEXT for it (for the error message).
+ *
+ * @param oid type OID
+ * @return type name
+ */
+ public static String getTypeName(int oid) {
+ switch (DataType.get(oid)) {
+ case BOOLEAN:
+ return "BOOLEAN";
+ case BYTEA:
+ return "BYTEA";
+ case CHAR:
+ return "CHAR";
+ case BIGINT:
+ return "BIGINT";
+ case SMALLINT:
+ return "SMALLINT";
+ case INTEGER:
+ return "INTEGER";
+ case TEXT:
+ return "TEXT";
+ case REAL:
+ return "REAL";
+ case FLOAT8:
+ return "FLOAT8";
+ case BPCHAR:
+ return "BPCHAR";
+ case VARCHAR:
+ return "VARCHAR";
+ case DATE:
+ return "DATE";
+ case TIME:
+ return "TIME";
+ case TIMESTAMP:
+ return "TIMESTAMP";
+ case NUMERIC:
+ return "NUMERIC";
+ default:
+ return "TEXT";
+ }
+ }
+
+ /*
+ * Get alignment from command line to match to the alignment
+ * the C code uses (see gphdfs/src/protocol_formatter/common.c).
+ */
+ private void initializeEightByteAlignment() {
+ String alignment = System.getProperty("greenplum.alignment");
+ if (alignment == null) {
+ return;
+ }
+ alignmentOfEightBytes = Integer.parseInt(alignment);
+ }
+
+ /**
+ * Returns if the writable object is empty,
+ * based on the pkt len as read from stream.
+ * -1 means nothing was read (eof).
+ *
+ * @return whether the writable object is empty
+ */
+ public boolean isEmpty() {
+ return pktlen == EOF;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Text.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Text.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Text.java
new file mode 100644
index 0000000..cdea1de
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Text.java
@@ -0,0 +1,379 @@
+package org.apache.hawq.pxf.service.io;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.*;
+import java.util.Arrays;
+
+/**
+ * This class stores text using standard UTF8 encoding. It provides methods to
+ * serialize, deserialize. The type of length is integer and is serialized using
+ * zero-compressed format.
+ */
+public class Text implements Writable {
+
+ // for write
+ private byte[] buf;
+ private static final Log LOG = LogFactory.getLog(Text.class);
+ int curLoc;
+ private static final char LINE_DELIMITER = '\n';
+ private static final int BUF_SIZE = 1024;
+ private static final int EOF = -1;
+
+ private static final byte[] EMPTY_BYTES = new byte[0];
+ private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() {
+ @Override
+ protected CharsetEncoder initialValue() {
+ return Charset.forName("UTF-8").newEncoder().onMalformedInput(
+ CodingErrorAction.REPORT).onUnmappableCharacter(
+ CodingErrorAction.REPORT);
+ }
+ };
+ private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal<CharsetDecoder>() {
+ @Override
+ protected CharsetDecoder initialValue() {
+ return Charset.forName("UTF-8").newDecoder().onMalformedInput(
+ CodingErrorAction.REPORT).onUnmappableCharacter(
+ CodingErrorAction.REPORT);
+ }
+ };
+ private byte[] bytes;
+ private int length;
+
+ public Text() {
+ bytes = EMPTY_BYTES;
+ buf = new byte[BUF_SIZE];
+ }
+
+ /**
+ * Construct from a string.
+ *
+ * @param string input string
+ */
+ public Text(String string) {
+ set(string);
+ }
+
+ /**
+ * Construct from another text.
+ *
+ * @param utf8 text to copy
+ */
+ public Text(Text utf8) {
+ set(utf8);
+ }
+
+ /**
+ * Construct from a byte array.
+ *
+ * @param utf8 input byte array
+ */
+ public Text(byte[] utf8) {
+ set(utf8);
+ }
+
+ public static boolean isNegativeVInt(byte value) {
+ return value < -120 || (value >= -112 && value < 0);
+ }
+
+ public static long readVLong(DataInput stream) throws IOException {
+ byte firstByte = stream.readByte();
+ int len = decodeVIntSize(firstByte);
+ if (len == 1) {
+ return firstByte;
+ }
+ long i = 0;
+ for (int idx = 0; idx < len - 1; idx++) {
+ byte b = stream.readByte();
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+ }
+
+ public static int decodeVIntSize(byte value) {
+ if (value >= -112) {
+ return 1;
+ } else if (value < -120) {
+ return -119 - value;
+ }
+ return -111 - value;
+ }
+
+ public static String decode(byte[] utf8, int start, int length)
+ throws CharacterCodingException {
+ return decode(ByteBuffer.wrap(utf8, start, length), true);
+ }
+
+ /**
+ * Converts the provided byte array to a String using the UTF-8 encoding. If
+ * <code>replace</code> is true, then malformed input is replaced with the
+ * substitution character, which is U+FFFD. Otherwise the method throws a
+ * MalformedInputException.
+ *
+ * @param utf8 UTF-8 encoded byte array
+ * @param start start point
+ * @param length length of array
+ * @param replace whether to replace malformed input with substitution
+ * character
+ * @return decoded string
+ * @throws MalformedInputException if a malformed input is used
+ * @throws CharacterCodingException if the conversion failed
+ */
+ public static String decode(byte[] utf8, int start, int length,
+ boolean replace)
+ throws CharacterCodingException {
+ return decode(ByteBuffer.wrap(utf8, start, length), replace);
+ }
+
+ private static String decode(ByteBuffer utf8, boolean replace)
+ throws CharacterCodingException {
+ CharsetDecoder decoder = DECODER_FACTORY.get();
+ if (replace) {
+ decoder.onMalformedInput(java.nio.charset.CodingErrorAction.REPLACE);
+ decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+ }
+ String str = decoder.decode(utf8).toString();
+ // set decoder back to its default value: REPORT
+ if (replace) {
+ decoder.onMalformedInput(CodingErrorAction.REPORT);
+ decoder.onUnmappableCharacter(CodingErrorAction.REPORT);
+ }
+ return str;
+ }
+
+ /**
+ * Converts the provided String to bytes using the UTF-8 encoding. If the
+ * input is malformed, invalid chars are replaced by a default value.
+ *
+ * @param string string to encode
+ * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is
+ * ByteBuffer.limit()
+ * @throws CharacterCodingException if conversion failed
+ */
+ public static ByteBuffer encode(String string)
+ throws CharacterCodingException {
+ return encode(string, true);
+ }
+
+ /**
+ * Converts the provided String to bytes using the UTF-8 encoding. If
+ * <code>replace</code> is true, then malformed input is replaced with the
+ * substitution character, which is U+FFFD. Otherwise the method throws a
+ * MalformedInputException.
+ *
+ * @param string string to encode
+ * @param replace whether to replace malformed input with substitution character
+ * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is
+ * ByteBuffer.limit()
+ * @throws MalformedInputException if a malformed input is used
+ * @throws CharacterCodingException if the conversion failed
+ */
+ public static ByteBuffer encode(String string, boolean replace)
+ throws CharacterCodingException {
+ CharsetEncoder encoder = ENCODER_FACTORY.get();
+ if (replace) {
+ encoder.onMalformedInput(CodingErrorAction.REPLACE);
+ encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+ }
+ ByteBuffer bytes = encoder.encode(CharBuffer.wrap(string.toCharArray()));
+ if (replace) {
+ encoder.onMalformedInput(CodingErrorAction.REPORT);
+ encoder.onUnmappableCharacter(CodingErrorAction.REPORT);
+ }
+ return bytes;
+ }
+
+ /**
+ * Returns the raw bytes; however, only data up to {@link #getLength()} is
+ * valid.
+ *
+ * @return raw bytes of byte array
+ */
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ /**
+ * Returns the number of bytes in the byte array
+ *
+ * @return number of bytes in byte array
+ */
+ public int getLength() {
+ return length;
+ }
+
+ /**
+ * Sets to contain the contents of a string.
+ *
+ * @param string input string
+ */
+ public void set(String string) {
+ try {
+ ByteBuffer bb = encode(string, true);
+ bytes = bb.array();
+ length = bb.limit();
+ } catch (CharacterCodingException e) {
+ throw new RuntimeException("Should not have happened "
+ + e.toString());
+ }
+ }
+
+ /**
+ * Sets to a UTF-8 byte array.
+ *
+ * @param utf8 input UTF-8 byte array
+ */
+ public void set(byte[] utf8) {
+ set(utf8, 0, utf8.length);
+ }
+
+ /**
+ * Copies a text.
+ *
+ * @param other text object to copy.
+ */
+ public void set(Text other) {
+ set(other.getBytes(), 0, other.getLength());
+ }
+
+ /**
+ * Sets the Text to range of bytes.
+ *
+ * @param utf8 the data to copy from
+ * @param start the first position of the new string
+ * @param len the number of bytes of the new string
+ */
+ public void set(byte[] utf8, int start, int len) {
+ setCapacity(len, false);
+ System.arraycopy(utf8, start, bytes, 0, len);
+ this.length = len;
+ }
+
+ /**
+ * Appends a range of bytes to the end of the given text.
+ *
+ * @param utf8 the data to copy from
+ * @param start the first position to append from utf8
+ * @param len the number of bytes to append
+ */
+ public void append(byte[] utf8, int start, int len) {
+ setCapacity(length + len, true);
+ System.arraycopy(utf8, start, bytes, length, len);
+ length += len;
+ }
+
+ /**
+ * Clears the string to empty.
+ */
+ public void clear() {
+ length = 0;
+ }
+
+ /*
+ * Sets the capacity of this Text object to <em>at least</em>
+ * <code>len</code> bytes. If the current buffer is longer, then the
+ * capacity and existing content of the buffer are unchanged. If
+ * <code>len</code> is larger than the current capacity, the Text object's
+ * capacity is increased to match.
+ *
+ * @param len the number of bytes we need
+ *
+ * @param keepData should the old data be kept
+ */
+ private void setCapacity(int len, boolean keepData) {
+ if (bytes == null || bytes.length < len) {
+ byte[] newBytes = new byte[len];
+ if (bytes != null && keepData) {
+ System.arraycopy(bytes, 0, newBytes, 0, length);
+ }
+ bytes = newBytes;
+ }
+ }
+
+ /**
+ * Convert text back to string
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ try {
+ return decode(bytes, 0, length);
+ } catch (CharacterCodingException e) {
+ throw new RuntimeException("Should not have happened "
+ + e.toString());
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ byte[] bytes = getBytes();
+ out.write(bytes, 0, getLength());
+ }
+
+ /**
+ * deserialize
+ */
+ @Override
+ public void readFields(DataInput inputStream) throws IOException {
+
+ byte c;
+ curLoc = 0;
+ clear();
+ while ((c = (byte) ((DataInputStream) inputStream).read()) != EOF) {
+ buf[curLoc] = c;
+ curLoc++;
+
+ if (c == LINE_DELIMITER) {
+ LOG.trace("read one line, size " + curLoc);
+ break;
+ }
+
+ if (isBufferFull()) {
+ flushBuffer();
+ }
+ }
+
+ if (!isBufferEmpty()) {
+ // the buffer doesn't end with a line break.
+ if (c == EOF) {
+ LOG.warn("Stream ended without line break");
+ }
+ flushBuffer();
+ }
+ }
+
+ private boolean isBufferEmpty() {
+ return (curLoc == 0);
+ }
+
+ private boolean isBufferFull() {
+ return (curLoc == BUF_SIZE);
+ }
+
+ private void flushBuffer() {
+ append(buf, 0, curLoc);
+ curLoc = 0;
+ }
+
+ /**
+ * Returns true iff <code>o</code> is a Text with the same contents.
+ */
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof Text && Arrays.equals(bytes, ((Text) o).bytes));
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(bytes);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Writable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Writable.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Writable.java
new file mode 100644
index 0000000..8741085
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/Writable.java
@@ -0,0 +1,30 @@
+package org.apache.hawq.pxf.service.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A serializable object which implements a simple, efficient, serialization
+ * protocol, based on {@link DataInput} and {@link DataOutput}.
+ */
+public interface Writable {
+
+ /**
+ * Serialize the fields of this object to <code>out</code>.
+ *
+ * @param out <code>DataOutput</code> to serialize this object into.
+ * @throws IOException if I/O error occurs
+ */
+ void write(DataOutput out) throws IOException;
+
+ /**
+ * Deserialize the fields of this object from <code>in</code>.
+ * <p>For efficiency, implementations should attempt to re-use storage in the
+ * existing object where possible.</p>
+ *
+ * @param in <code>DataInput</code> to deserialize this object from.
+ * @throws IOException if I/O error occurs
+ */
+ void readFields(DataInput in) throws IOException;
+}