You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by od...@apache.org on 2016/08/26 23:05:46 UTC

[3/6] incubator-hawq git commit: HAWQ-992. PXF Hive data type check in Fragmenter too restrictive.

HAWQ-992. PXF Hive data type check in Fragmenter too restrictive.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/e2416f49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/e2416f49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/e2416f49

Branch: refs/heads/HAWQ-992
Commit: e2416f498ebb2be29712a1042c4e9bae99f523ff
Parents: 24f5e36
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Fri Aug 26 16:04:53 2016 -0700
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Fri Aug 26 16:04:53 2016 -0700

----------------------------------------------------------------------
 .../hawq/pxf/api/utilities/EnumHawqType.java    |  28 +-
 .../plugins/hive/HiveColumnarSerdeResolver.java |   2 +-
 .../plugins/hive/HiveInputFormatFragmenter.java |   4 +-
 .../hive/utilities/EnumHiveToHawqType.java      |  61 +-
 .../plugins/hive/utilities/HiveUtilities.java   |  45 +-
 .../hive/utilities/HiveUtilitiesTest.java       |  65 ++
 .../pxf/service/utilities/ProtocolData.java     |   5 +-
 .../org/apache/hawq/pxf/service/Bridge.java     |  40 +
 .../hawq/pxf/service/BridgeInputBuilder.java    |  71 ++
 .../hawq/pxf/service/BridgeOutputBuilder.java   | 394 ++++++++
 .../hawq/pxf/service/FragmenterFactory.java     |  37 +
 .../hawq/pxf/service/FragmentsResponse.java     |  89 ++
 .../pxf/service/FragmentsResponseFormatter.java | 157 ++++
 .../hawq/pxf/service/GPDBWritableMapper.java    | 135 +++
 .../pxf/service/MetadataFetcherFactory.java     |  36 +
 .../hawq/pxf/service/MetadataResponse.java      |  93 ++
 .../pxf/service/MetadataResponseFormatter.java  |  95 ++
 .../org/apache/hawq/pxf/service/ReadBridge.java | 179 ++++
 .../hawq/pxf/service/ReadSamplingBridge.java    | 131 +++
 .../apache/hawq/pxf/service/WriteBridge.java    | 117 +++
 .../hawq/pxf/service/io/BufferWritable.java     |  98 ++
 .../hawq/pxf/service/io/GPDBWritable.java       | 893 +++++++++++++++++++
 .../org/apache/hawq/pxf/service/io/Text.java    | 399 +++++++++
 .../apache/hawq/pxf/service/io/Writable.java    |  50 ++
 .../apache/hawq/pxf/service/package-info.java   |  23 +
 .../hawq/pxf/service/rest/BridgeResource.java   | 189 ++++
 .../pxf/service/rest/ClusterNodesResource.java  | 148 +++
 .../pxf/service/rest/FragmenterResource.java    | 154 ++++
 .../pxf/service/rest/InvalidPathResource.java   | 179 ++++
 .../hawq/pxf/service/rest/MetadataResource.java | 124 +++
 .../hawq/pxf/service/rest/RestResource.java     |  71 ++
 .../service/rest/ServletLifecycleListener.java  |  63 ++
 .../hawq/pxf/service/rest/VersionResource.java  |  88 ++
 .../hawq/pxf/service/rest/WritableResource.java | 174 ++++
 .../pxf/service/utilities/AnalyzeUtils.java     | 147 +++
 .../service/utilities/CustomWebappLoader.java   | 231 +++++
 .../pxf/service/utilities/Log4jConfigure.java   |  66 ++
 .../pxf/service/utilities/ProtocolData.java     | 491 ++++++++++
 .../hawq/pxf/service/utilities/SecureLogin.java |  61 ++
 .../hawq/pxf/service/utilities/SecuredHDFS.java | 114 +++
 40 files changed, 5509 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java
index 01d40f0..f35fa5e 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumHawqType.java
@@ -56,23 +56,27 @@ public enum EnumHawqType {
     DateType("date", DataType.DATE),
     TimestampType("timestamp", DataType.TIMESTAMP),
     BoolType("bool", DataType.BOOLEAN),
-    NumericType("numeric", DataType.NUMERIC, (byte) 2, true),
+    NumericType("numeric", DataType.NUMERIC, (byte) 2, false),
     BpcharType("bpchar", DataType.BPCHAR, (byte) 1, true);
 
     private DataType dataType;
     private String typeName;
     private byte modifiersNum;
-    private boolean validateIntegerModifiers;
+    private boolean mandatoryModifiers;
 
     EnumHawqType(String typeName, DataType dataType) {
         this.typeName = typeName;
         this.dataType = dataType;
     }
 
-    EnumHawqType(String typeName, DataType dataType, byte modifiersNum, boolean validateIntegerModifiers) {
+    EnumHawqType(String typeName, DataType dataType, byte modifiersNum) {
         this(typeName, dataType);
         this.modifiersNum = modifiersNum;
-        this.validateIntegerModifiers = validateIntegerModifiers;
+    }
+
+    EnumHawqType(String typeName, DataType dataType, byte modifiersNum, boolean mandatoryModifiers) {
+        this(typeName, dataType, modifiersNum);
+        this.setMandatoryModifiers(mandatoryModifiers);
     }
 
     /**
@@ -93,19 +97,19 @@ public enum EnumHawqType {
 
     /**
      * 
-     * @return whether modifiers should be integers
-     */
-    public boolean getValidateIntegerModifiers() {
-        return this.validateIntegerModifiers;
-    }
-
-    /**
-     * 
      * @return data type
      */
     public DataType getDataType() {
         return this.dataType;
     }
+
+    public boolean isMandatoryModifiers() {
+        return mandatoryModifiers;
+    }
+
+    public void setMandatoryModifiers(boolean mandatoryModifiers) {
+        this.mandatoryModifiers = mandatoryModifiers;
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
index 43e3b65..606ddc6 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -127,7 +127,7 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
         for (int i = 0; i < numberOfDataColumns; i++) {
             ColumnDescriptor column = input.getColumn(i);
             String columnName = column.columnName();
-            String columnType = HiveUtilities.toHiveType(DataType.get(column.columnTypeCode()));
+            String columnType = HiveUtilities.toCompatibleHiveType(DataType.get(column.columnTypeCode()));
             columnNames.append(delim).append(columnName);
             columnTypes.append(delim).append(columnType);
             delim = ",";

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
index b944206..ccc8fa7 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
@@ -148,14 +148,14 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
         for (FieldSchema hiveCol : hiveColumns) {
             ColumnDescriptor colDesc = inputData.getColumn(index++);
             DataType colType = DataType.get(colDesc.columnTypeCode());
-            HiveUtilities.compareTypes(colType, hiveCol.getType(), colDesc.columnName());
+            HiveUtilities.validateTypeCompatible(colType, colDesc.columnTypeModifiers(), hiveCol.getType(), colDesc.columnName());
         }
         // check partition fields
         List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
         for (FieldSchema hivePart : hivePartitions) {
             ColumnDescriptor colDesc = inputData.getColumn(index++);
             DataType colType = DataType.get(colDesc.columnTypeCode());
-            HiveUtilities.compareTypes(colType, hivePart.getType(), colDesc.columnName());
+            HiveUtilities.validateTypeCompatible(colType, colDesc.columnTypeModifiers(), hivePart.getType(), colDesc.columnName());
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
index 1cedaa8..9b24642 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
@@ -19,6 +19,11 @@
 
 package org.apache.hawq.pxf.plugins.hive.utilities;
 
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
 import org.apache.hawq.pxf.api.io.DataType;
 import org.apache.hawq.pxf.api.utilities.EnumHawqType;
 import org.apache.hawq.pxf.api.UnsupportedTypeException;
@@ -30,8 +35,8 @@ import org.apache.hawq.pxf.api.UnsupportedTypeException;
  */
 public enum EnumHiveToHawqType {
 
-    TinyintType("tinyint", EnumHawqType.Int2Type),
-    SmallintType("smallint", EnumHawqType.Int2Type),
+    TinyintType("tinyint", EnumHawqType.Int2Type, (byte) 1),
+    SmallintType("smallint", EnumHawqType.Int2Type, (byte) 2),
     IntType("int", EnumHawqType.Int4Type),
     BigintType("bigint", EnumHawqType.Int8Type),
     BooleanType("boolean", EnumHawqType.BoolType),
@@ -52,11 +57,17 @@ public enum EnumHiveToHawqType {
     private String typeName;
     private EnumHawqType hawqType;
     private String splitExpression;
+    private byte size;
 
     EnumHiveToHawqType(String typeName, EnumHawqType hawqType) {
         this.typeName = typeName;
         this.hawqType = hawqType;
     }
+    
+    EnumHiveToHawqType(String typeName, EnumHawqType hawqType, byte size) {
+        this(typeName, hawqType);
+        this.setSize(size);
+    }
 
     EnumHiveToHawqType(String typeName, EnumHawqType hawqType, String splitExpression) {
         this(typeName, hawqType);
@@ -111,15 +122,51 @@ public enum EnumHiveToHawqType {
                 + hiveType + " to HAWQ's type");
     }
 
-    public static EnumHiveToHawqType getHawqToHiveType(DataType dataType) {
+    public static EnumHiveToHawqType getCompatibleHawqToHiveType(DataType dataType) {
 
-        for (EnumHiveToHawqType t : values()) {
+        SortedSet<EnumHiveToHawqType> types = new TreeSet<EnumHiveToHawqType>(new Comparator<EnumHiveToHawqType>() {
 
+            public int compare(EnumHiveToHawqType a, EnumHiveToHawqType b){
+                return Byte.compare(a.getSize(), b.getSize());
+            }
+        });
+
+        for (EnumHiveToHawqType t : values()) {
             if (t.getHawqType().getDataType().equals(dataType)) {
-                return t;
+                types.add(t);
+            }
+        }
+
+        if (types.size() == 0)
+            throw new UnsupportedTypeException("Unable to map HAWQ's type: "
+                    + dataType + " to Hive's type");
+
+        return types.last();
+    }
+
+    public static String[] extractModifiers(String hiveType) {
+        String[] result = null;
+        for (EnumHiveToHawqType t : values()) {
+            String hiveTypeName = hiveType;
+            String splitExpression = t.getSplitExpression();
+            if (splitExpression != null) {
+                String[] tokens = hiveType.split(splitExpression);
+                hiveTypeName = tokens[0];
+                result = Arrays.copyOfRange(tokens, 1, tokens.length);
+            }
+            if (t.getTypeName().toLowerCase().equals(hiveTypeName.toLowerCase())) {
+                return result;
             }
         }
-        throw new UnsupportedTypeException("Unable to map HAWQ's type: "
-                + dataType + " to Hive's type");
+        throw new UnsupportedTypeException("Unable to map Hive's type: "
+                + hiveType + " to HAWQ's type");
+    }
+
+    public byte getSize() {
+        return size;
+    }
+
+    public void setSize(byte size) {
+        this.size = size;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index 579ab0b..6bda9b7 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -139,7 +139,7 @@ public class HiveUtilities {
                                     + ", actual number of modifiers: "
                                     + modifiers.length);
                 }
-                if (hawqType.getValidateIntegerModifiers() && !verifyIntegerModifiers(modifiers)) {
+                if (!verifyIntegerModifiers(modifiers)) {
                     throw new UnsupportedTypeException("HAWQ does not support type " + hiveType + " (Field " + fieldName + "), modifiers should be integers");
                 }
             }
@@ -279,24 +279,41 @@ public class HiveUtilities {
      * @return Hive type
      * @throws UnsupportedTypeException if type is not supported
      */
-    public static String toHiveType(DataType type) {
+    public static String toCompatibleHiveType(DataType type) {
 
-        EnumHiveToHawqType hiveToHawqType = EnumHiveToHawqType.getHawqToHiveType(type);
+        EnumHiveToHawqType hiveToHawqType = EnumHiveToHawqType.getCompatibleHawqToHiveType(type);
         return hiveToHawqType.getTypeName();
     }
 
-    public static void compareTypes(DataType type, String hiveType, String columnName) {
-        String convertedHive = toHiveType(type);
-        if (!convertedHive.equals(hiveType)
-                && !(convertedHive.equals("smallint") && hiveType.equals("tinyint"))) {
-            throw new UnsupportedTypeException(
-                    "Schema mismatch definition:" 
-                            + " (Hive type " + hiveType + ", HAWQ type "
-                            + type.toString() + ")");
+
+
+    public static void validateTypeCompatible(DataType hawqDataType, String[] hawqTypeMods, String hiveType, String hawqColumnName) {
+
+        EnumHiveToHawqType hiveToHawqType = EnumHiveToHawqType.getHiveToHawqType(hiveType);
+        EnumHawqType expectedHawqType = hiveToHawqType.getHawqType();
+
+        if ((hawqTypeMods == null || hawqTypeMods.length == 0) && expectedHawqType.isMandatoryModifiers())
+            throw new UnsupportedTypeException("Invalid definition for column " + hawqColumnName +  ": modifiers are mandatory for type " + expectedHawqType.getTypeName());
+
+        switch (hawqDataType) {
+        case NUMERIC:
+            String[] hiveTypeModifiers = EnumHiveToHawqType.extractModifiers(hiveType);
+            for (int i = 0; hawqTypeMods != null && i < hawqTypeMods.length; i++) {
+                if (Integer.valueOf(hawqTypeMods[i]) < Integer
+                        .valueOf(hiveTypeModifiers[i]))
+                    throw new UnsupportedTypeException(
+                            "Invalid definition for column " + hawqColumnName 
+                                    +  ": modifiers are not compatible, "
+                                    + Arrays.toString(hiveTypeModifiers) + ", "
+                                    + Arrays.toString(hawqTypeMods));
+            }
+            break;
         }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(" Hive type " + hiveType
-                    + ", HAWQ type " + type.toString());
+
+        if (!hiveToHawqType.getHawqType().equals(expectedHawqType)) {
+            throw new UnsupportedTypeException("Invalid definition for column " + hawqColumnName 
+                                    +  ": expected HAWQ type " + expectedHawqType.getTypeName() +
+                    ", actual HAWQ type " + hiveToHawqType.getHawqType().getTypeName() + ")");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java
index e9b024a..e94351a 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilitiesTest.java
@@ -22,11 +22,16 @@ package org.apache.hawq.pxf.plugins.hive.utilities;
 
 import static org.junit.Assert.*;
 
+import java.util.Arrays;
+
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.EnumHawqType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.junit.Test;
 import org.apache.hawq.pxf.api.Metadata;
 import org.apache.hawq.pxf.api.UnsupportedTypeException;
 import org.apache.hawq.pxf.plugins.hive.utilities.EnumHiveToHawqType;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 
 public class HiveUtilitiesTest {
 
@@ -121,6 +126,66 @@ public class HiveUtilitiesTest {
     }
 
     @Test
+    public void testCompatibleHiveType() {
+        String compatibleTypeName = HiveUtilities.toCompatibleHiveType(DataType.SMALLINT);
+        assertEquals(compatibleTypeName, EnumHiveToHawqType.SmallintType.getTypeName());
+    }
+
+    @Test
+    public void validateSchema() throws Exception {
+        String columnName = "abc";
+
+        String[] hawqModifiers = {};
+        HiveUtilities.validateTypeCompatible(DataType.SMALLINT, hawqModifiers, EnumHiveToHawqType.TinyintType.getTypeName(), columnName);
+
+        HiveUtilities.validateTypeCompatible(DataType.SMALLINT, hawqModifiers, EnumHiveToHawqType.SmallintType.getTypeName(), columnName);
+
+        //Both Hive and HAWQ types have the same modifiers
+        hawqModifiers = new String[]{"38", "18"};
+        HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName);
+
+        //HAWQ datatype doesn't require modifiers, they are empty, Hive has non-empty modifiers
+        //Types are compatible in this case
+        hawqModifiers = new String[]{};
+        HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName);
+        hawqModifiers = null;
+        HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName);
+
+        //HAWQ datatype requires modifiers but they aren't provided
+        //Types aren't compatible
+        try {
+            hawqModifiers = new String[]{};
+            HiveUtilities.validateTypeCompatible(DataType.VARCHAR, hawqModifiers, "varchar", columnName);
+            fail("should fail with incompatible modifiers message");
+        }
+        catch (UnsupportedTypeException e) {
+            String errorMsg = "Invalid definition for column " + columnName +  ": modifiers are mandatory for type " + EnumHawqType.VarcharType.getTypeName();
+            assertEquals(errorMsg, e.getMessage());
+        }
+
+
+        //HAWQ has lesser modifiers than Hive, types aren't compatible
+        try {
+            hawqModifiers = new String[]{"38", "17"};
+            HiveUtilities.validateTypeCompatible(DataType.NUMERIC, hawqModifiers, "decimal(38,18)", columnName);
+            fail("should fail with incompatible modifiers message");
+        }
+        catch (UnsupportedTypeException e) {
+            String errorMsg = "Invalid definition for column " + columnName 
+                    +  ": modifiers are not compatible, "
+                    + Arrays.toString(new String[]{"38", "18"}) + ", "
+                    + Arrays.toString(new String[]{"38", "17"});
+            assertEquals(errorMsg, e.getMessage());
+        }
+    }
+
+    @Test
+    public void extractModifiers() throws Exception {
+        String[] mods = EnumHiveToHawqType.extractModifiers("decimal(10,2)");
+        assertEquals(mods, new String[]{"10", "2"});
+    }
+
+    @Test
     public void mapHiveTypeWithModifiersNegative() throws Exception {
 
         String badHiveType = "decimal(2)";

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index 5e6f6c4..2838232 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -402,9 +402,10 @@ public class ProtocolData extends InputData {
     }
 
     private String[] parseTypeMods(int columnIndex) {
-        Integer typeModeCount = Integer.parseInt(getOptionalProperty("ATTR-TYPEMOD" + columnIndex + "COUNT"));
+        String typeModeCountStr = getOptionalProperty("ATTR-TYPEMOD" + columnIndex + "COUNT");
         String[] result = null;
-        if (typeModeCount > 0) {
+        if (typeModeCountStr != null) {
+        Integer typeModeCount = Integer.parseInt(typeModeCountStr);
             result = new String[typeModeCount];
             for (int i = 0; i < typeModeCount; i++) {
                 result[i] = getProperty("ATTR-TYPEMOD" + columnIndex + "-" + i);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java
new file mode 100644
index 0000000..bfd862a
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/Bridge.java
@@ -0,0 +1,40 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.service.io.Writable;
+
+import java.io.DataInputStream;
+
+/**
+ * Bridge interface - defines the interface of the Bridge classes. Any Bridge
+ * class acts as an iterator over Hadoop stored data, and should implement
+ * getNext (for reading) or setNext (for writing) for handling accessed data.
+ */
+public interface Bridge {
+    boolean beginIteration() throws Exception;
+
+    Writable getNext() throws Exception;
+
+    boolean setNext(DataInputStream inputStream) throws Exception;
+
+    boolean isThreadSafe();
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java
new file mode 100644
index 0000000..4b4d2e8
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeInputBuilder.java
@@ -0,0 +1,71 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OutputFormat;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.Text;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInput;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+public class BridgeInputBuilder {
+    private ProtocolData protocolData;
+    private static final Log LOG = LogFactory.getLog(BridgeInputBuilder.class);
+
+    public BridgeInputBuilder(ProtocolData protocolData) throws Exception {
+        this.protocolData = protocolData;
+    }
+
+    public List<OneField> makeInput(DataInput inputStream) throws Exception {
+        if (protocolData.outputFormat() == OutputFormat.TEXT) {
+            Text txt = new Text();
+            txt.readFields(inputStream);
+            return Collections.singletonList(new OneField(DataType.BYTEA.getOID(), txt.getBytes()));
+        }
+
+        GPDBWritable gpdbWritable = new GPDBWritable();
+        gpdbWritable.readFields(inputStream);
+
+        if (gpdbWritable.isEmpty()) {
+            LOG.debug("Reached end of stream");
+            return null;
+        }
+
+        GPDBWritableMapper mapper = new GPDBWritableMapper(gpdbWritable);
+        int[] colTypes = gpdbWritable.getColType();
+        List<OneField> record = new LinkedList<OneField>();
+        for (int i = 0; i < colTypes.length; i++) {
+            mapper.setDataType(colTypes[i]);
+            record.add(new OneField(colTypes[i], mapper.getData(i)));
+        }
+
+        return record;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
new file mode 100644
index 0000000..c59fbea
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
@@ -0,0 +1,394 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OutputFormat;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.BufferWritable;
+import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException;
+import org.apache.hawq.pxf.service.io.Text;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.hawq.pxf.api.io.DataType.TEXT;
+
+/**
+ * Class creates the output record that is piped by the java process to the HAWQ
+ * backend. Actually, the output record is serialized and the obtained byte
+ * string is piped to the HAWQ segment. The output record will implement
+ * Writable, and the mission of BridgeOutputBuilder will be to translate a list
+ * of {@link OneField} objects (obtained from the Resolver) into an output
+ * record.
+ */
+public class BridgeOutputBuilder {
+    private ProtocolData inputData;
+    private Writable output = null;
+    private LinkedList<Writable> outputList = null;
+    private Writable partialLine = null;
+    private GPDBWritable errorRecord = null;
+    private int[] schema;
+    private String[] colNames;
+    private boolean samplingEnabled = false;
+    private boolean isPartialLine = false;
+
+    private static final byte DELIM = 10; /* (byte)'\n'; */
+
+    private static final Log LOG = LogFactory.getLog(BridgeOutputBuilder.class);
+
+    /**
+     * Constructs a BridgeOutputBuilder.
+     *
+     * @param input input data, like requested output format and schema
+     *            information
+     */
+    public BridgeOutputBuilder(ProtocolData input) {
+        inputData = input;
+        outputList = new LinkedList<Writable>();
+        makeErrorRecord();
+        samplingEnabled = (inputData.getStatsSampleRatio() > 0);
+    }
+
+    /**
+     * We need a separate GPDBWritable record to represent the error record.
+     * Just setting the errorFlag on the "output" GPDBWritable variable is not
+     * good enough, since the GPDBWritable is built only after the first record
+     * is read from the file. And if we encounter an error while fetching the
+     * first record from the file, then the output member will be null. The
+     * reason we cannot count on the schema to build the GPDBWritable output
+     * variable before reading the first record, is because the schema does not
+     * account for arrays - we cannot know from the schema the length of an
+     * array. We find out only after fetching the first record.
+     */
+    void makeErrorRecord() {
+        int[] errSchema = { TEXT.getOID() };
+
+        if (inputData.outputFormat() != OutputFormat.BINARY) {
+            return;
+        }
+
+        errorRecord = new GPDBWritable(errSchema);
+        errorRecord.setError(true);
+    }
+
+    /**
+     * Returns the error record. If the output format is not binary, error
+     * records are not supported, and the given exception will be thrown
+     *
+     * @param ex exception to be stored in record
+     * @return error record
+     * @throws Exception if the output format is not binary
+     */
+    public Writable getErrorOutput(Exception ex) throws Exception {
+        if (inputData.outputFormat() == OutputFormat.BINARY) {
+            errorRecord.setString(0, ex.getMessage());
+            return errorRecord;
+        } else {
+            throw ex;
+        }
+    }
+
+    /**
+     * Translates recFields (obtained from the Resolver) into an output record.
+     *
+     * @param recFields record fields to be serialized
+     * @return list of Writable objects with serialized row
+     * @throws BadRecordException if building the output record failed
+     */
+    public LinkedList<Writable> makeOutput(List<OneField> recFields)
+            throws BadRecordException {
+        if (output == null && inputData.outputFormat() == OutputFormat.BINARY) {
+            makeGPDBWritableOutput();
+        }
+
+        outputList.clear();
+
+        fillOutputRecord(recFields);
+
+        return outputList;
+    }
+
+    /**
+     * Returns whether or not this is a partial line.
+     *
+     * @return true for a partial line
+     */
+    public Writable getPartialLine() {
+        return partialLine;
+    }
+
+    /**
+     * Creates the GPDBWritable object. The object is created one time and is
+     * refilled from recFields for each record sent
+     *
+     * @return empty GPDBWritable object with set columns
+     */
+    GPDBWritable makeGPDBWritableOutput() {
+        int num_actual_fields = inputData.getColumns();
+        schema = new int[num_actual_fields];
+        colNames = new String[num_actual_fields];
+
+        for (int i = 0; i < num_actual_fields; i++) {
+            schema[i] = inputData.getColumn(i).columnTypeCode();
+            colNames[i] = inputData.getColumn(i).columnName();
+        }
+
+        output = new GPDBWritable(schema);
+
+        return (GPDBWritable) output;
+    }
+
+    /**
+     * Fills the output record based on the fields in recFields.
+     *
+     * @param recFields record fields
+     * @throws BadRecordException if building the output record failed
+     */
+    void fillOutputRecord(List<OneField> recFields) throws BadRecordException {
+        if (inputData.outputFormat() == OutputFormat.BINARY) {
+            fillGPDBWritable(recFields);
+        } else {
+            fillText(recFields);
+        }
+    }
+
+    /**
+     * Fills a GPDBWritable object based on recFields. The input record
+     * recFields must correspond to schema. If the record has more or less
+     * fields than the schema we throw an exception. We require that the type of
+     * field[i] in recFields corresponds to the type of field[i] in the schema.
+     *
+     * @param recFields record fields
+     * @throws BadRecordException if building the output record failed
+     */
+    void fillGPDBWritable(List<OneField> recFields) throws BadRecordException {
+        int size = recFields.size();
+        if (size == 0) { // size 0 means the resolver couldn't deserialize any
+                         // of the record fields
+            throw new BadRecordException("No fields in record");
+        } else if (size != schema.length) {
+            throw new BadRecordException("Record has " + size
+                    + " fields but the schema size is " + schema.length);
+        }
+
+        for (int i = 0; i < size; i++) {
+            OneField current = recFields.get(i);
+            if (!isTypeInSchema(current.type, schema[i])) {
+                throw new BadRecordException("For field " + colNames[i]
+                        + " schema requires type "
+                        + DataType.get(schema[i]).toString()
+                        + " but input record has type "
+                        + DataType.get(current.type).toString());
+            }
+
+            fillOneGPDBWritableField(current, i);
+        }
+
+        outputList.add(output);
+    }
+
+    /**
+     * Tests if data type is a string type. String type is a type that can be
+     * serialized as string, such as varchar, bpchar, text, numeric, timestamp,
+     * date.
+     *
+     * @param type data type
+     * @return whether data type is string type
+     */
+    boolean isStringType(DataType type) {
+        return Arrays.asList(DataType.VARCHAR, DataType.BPCHAR, DataType.TEXT,
+                DataType.NUMERIC, DataType.TIMESTAMP, DataType.DATE).contains(
+                type);
+    }
+
+    /**
+     * Tests if record field type and schema type correspond.
+     *
+     * @param recType record type code
+     * @param schemaType schema type code
+     * @return whether record type and schema type match
+     */
+    boolean isTypeInSchema(int recType, int schemaType) {
+        DataType dtRec = DataType.get(recType);
+        DataType dtSchema = DataType.get(schemaType);
+
+        return (dtSchema == DataType.UNSUPPORTED_TYPE || dtRec == dtSchema || (isStringType(dtRec) && isStringType(dtSchema)));
+    }
+
+    /**
+     * Fills a Text object based on recFields.
+     *
+     * @param recFields record fields
+     * @throws BadRecordException if text formatted record has more than one
+     *             field
+     */
+    void fillText(List<OneField> recFields) throws BadRecordException {
+        /*
+         * For the TEXT case there must be only one record in the list
+         */
+        if (recFields.size() != 1) {
+            throw new BadRecordException(
+                    "BridgeOutputBuilder must receive one field when handling the TEXT format");
+        }
+
+        OneField fld = recFields.get(0);
+        int type = fld.type;
+        Object val = fld.val;
+        if (DataType.get(type) == DataType.BYTEA) {// from LineBreakAccessor
+            if (samplingEnabled) {
+                convertTextDataToLines((byte[]) val);
+            } else {
+                output = new BufferWritable((byte[]) val);
+                outputList.add(output); // TODO break output into lines
+            }
+        } else { // from QuotedLineBreakAccessor
+            String textRec = (String) val;
+            output = new Text(textRec + "\n");
+            outputList.add(output);
+        }
+    }
+
+    /**
+     * Breaks raw bytes into lines. Used only for sampling.
+     *
+     * When sampling a data source, we have to make sure that we deal with
+     * actual rows (lines) and not bigger chunks of data such as used by
+     * LineBreakAccessor for performance. The input byte array is broken into
+     * lines, each one stored in the outputList. In case the read data doesn't
+     * end with a line delimiter, which can happen when reading chunks of bytes,
+     * the partial line is stored separately, and is being completed when
+     * reading the next chunk of data.
+     *
+     * @param val input raw data to break into lines
+     */
+    void convertTextDataToLines(byte[] val) {
+        int len = val.length;
+        int start = 0;
+        int end = 0;
+        byte[] line;
+        BufferWritable writable;
+
+        while (start < len) {
+            end = ArrayUtils.indexOf(val, DELIM, start);
+            if (end == ArrayUtils.INDEX_NOT_FOUND) {
+                // data finished in the middle of the line
+                end = len;
+                isPartialLine = true;
+            } else {
+                end++; // include the DELIM character
+                isPartialLine = false;
+            }
+            line = Arrays.copyOfRange(val, start, end);
+
+            if (partialLine != null) {
+                // partial data was completed
+                ((BufferWritable) partialLine).append(line);
+                writable = (BufferWritable) partialLine;
+                partialLine = null;
+            } else {
+                writable = new BufferWritable(line);
+            }
+
+            if (isPartialLine) {
+                partialLine = writable;
+            } else {
+                outputList.add(writable);
+            }
+            start = end;
+        }
+    }
+
+    /**
+     * Fills one GPDBWritable field.
+     *
+     * @param oneField field
+     * @param colIdx column index
+     * @throws BadRecordException if field type is not supported or doesn't
+     *             match the schema
+     */
+    void fillOneGPDBWritableField(OneField oneField, int colIdx)
+            throws BadRecordException {
+        int type = oneField.type;
+        Object val = oneField.val;
+        GPDBWritable gpdbOutput = (GPDBWritable) output;
+        try {
+            switch (DataType.get(type)) {
+                case INTEGER:
+                    gpdbOutput.setInt(colIdx, (Integer) val);
+                    break;
+                case FLOAT8:
+                    gpdbOutput.setDouble(colIdx, (Double) val);
+                    break;
+                case REAL:
+                    gpdbOutput.setFloat(colIdx, (Float) val);
+                    break;
+                case BIGINT:
+                    gpdbOutput.setLong(colIdx, (Long) val);
+                    break;
+                case SMALLINT:
+                    gpdbOutput.setShort(colIdx, (Short) val);
+                    break;
+                case BOOLEAN:
+                    gpdbOutput.setBoolean(colIdx, (Boolean) val);
+                    break;
+                case BYTEA:
+                    byte[] bts = null;
+                    if (val != null) {
+                        int length = Array.getLength(val);
+                        bts = new byte[length];
+                        for (int j = 0; j < length; j++) {
+                            bts[j] = Array.getByte(val, j);
+                        }
+                    }
+                    gpdbOutput.setBytes(colIdx, bts);
+                    break;
+                case VARCHAR:
+                case BPCHAR:
+                case CHAR:
+                case TEXT:
+                case NUMERIC:
+                case TIMESTAMP:
+                case DATE:
+                    gpdbOutput.setString(colIdx,
+                            ObjectUtils.toString(val, null));
+                    break;
+                default:
+                    String valClassName = (val != null) ? val.getClass().getSimpleName()
+                            : null;
+                    throw new UnsupportedOperationException(valClassName
+                            + " is not supported for HAWQ conversion");
+            }
+        } catch (TypeMismatchException e) {
+            throw new BadRecordException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java
new file mode 100644
index 0000000..c516d69
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmenterFactory.java
@@ -0,0 +1,37 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+
+/**
+ * Factory class for creation of {@link Fragmenter} objects. The actual {@link Fragmenter} object is "hidden" behind
+ * an {@link Fragmenter} abstract class which is returned by the FragmenterFactory. 
+ */
+public class FragmenterFactory {
+    static public Fragmenter create(InputData inputData) throws Exception {
+    	String fragmenterName = inputData.getFragmenter();
+    	
+        return (Fragmenter) Utilities.createAnyInstance(InputData.class, fragmenterName, inputData);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java
new file mode 100644
index 0000000..d6efcae
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponse.java
@@ -0,0 +1,89 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.apache.hawq.pxf.api.Fragment;
+
+/**
+ * Class for serializing fragments metadata in JSON format. The class implements
+ * {@link StreamingOutput} so the serialization will be done in a stream and not
+ * in one bulk, this in order to avoid running out of memory when processing a
+ * lot of fragments.
+ */
+public class FragmentsResponse implements StreamingOutput {
+
+    private static final Log Log = LogFactory.getLog(FragmentsResponse.class);
+
+    private List<Fragment> fragments;
+
+    /**
+     * Constructs fragments response out of a list of fragments
+     *
+     * @param fragments fragment list
+     */
+    public FragmentsResponse(List<Fragment> fragments) {
+        this.fragments = fragments;
+    }
+
+    /**
+     * Serializes a fragments list in JSON, To be used as the result string for
+     * HAWQ. An example result is as follows:
+     * <code>{"PXFFragments":[{"replicas":
+     * ["sdw1.corp.emc.com","sdw3.corp.emc.com","sdw8.corp.emc.com"],
+     * "sourceName":"text2.csv", "index":"0","metadata":"&lt;base64 metadata for fragment&gt;",
+     * "userData":"&lt;data_specific_to_third_party_fragmenter&gt;"
+     * },{"replicas":["sdw2.corp.emc.com","sdw4.corp.emc.com","sdw5.corp.emc.com"
+     * ],"sourceName":"text_data.csv","index":"0","metadata":
+     * "&lt;base64 metadata for fragment&gt;"
+     * ,"userData":"&lt;data_specific_to_third_party_fragmenter&gt;"
+     * }]}</code>
+     */
+    @Override
+    public void write(OutputStream output) throws IOException,
+            WebApplicationException {
+        DataOutputStream dos = new DataOutputStream(output);
+        ObjectMapper mapper = new ObjectMapper();
+
+        dos.write("{\"PXFFragments\":[".getBytes());
+
+        String prefix = "";
+        for (Fragment fragment : fragments) {
+            StringBuilder result = new StringBuilder();
+            /* metaData and userData are automatically converted to Base64 */
+            result.append(prefix).append(mapper.writeValueAsString(fragment));
+            prefix = ",";
+            dos.write(result.toString().getBytes());
+        }
+
+        dos.write("]}".getBytes());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
new file mode 100644
index 0000000..14e87f9
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
@@ -0,0 +1,157 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Utility class for converting Fragments into a {@link FragmentsResponse} that
+ * will serialize them into JSON format.
+ */
+public class FragmentsResponseFormatter {
+
+    private static final Log LOG = LogFactory.getLog(FragmentsResponseFormatter.class);
+
+    /**
+     * Converts Fragments list to FragmentsResponse after replacing host name by
+     * their respective IPs.
+     *
+     * @param fragments list of fragments
+     * @param data data (e.g. path) related to the fragments
+     * @return FragmentsResponse with given fragments
+     * @throws UnknownHostException if converting host names to IP fails
+     */
+    public static FragmentsResponse formatResponse(List<Fragment> fragments,
+                                                   String data)
+            throws UnknownHostException {
+        /* print the raw fragment list to log when in debug level */
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Fragments before conversion to IP list:");
+            FragmentsResponseFormatter.printList(fragments, data);
+        }
+
+        /* HD-2550: convert host names to IPs */
+        convertHostsToIPs(fragments);
+
+        updateFragmentIndex(fragments);
+
+        /* print the fragment list to log when in debug level */
+        if (LOG.isDebugEnabled()) {
+            FragmentsResponseFormatter.printList(fragments, data);
+        }
+
+        return new FragmentsResponse(fragments);
+    }
+
+    /**
+     * Updates the fragments' indexes so that it is incremented by sourceName.
+     * (E.g.: {"a", 0}, {"a", 1}, {"b", 0} ... )
+     *
+     * @param fragments fragments to be updated
+     */
+    private static void updateFragmentIndex(List<Fragment> fragments) {
+
+        String sourceName = null;
+        int index = 0;
+        for (Fragment fragment : fragments) {
+
+            String currentSourceName = fragment.getSourceName();
+            if (!currentSourceName.equals(sourceName)) {
+                index = 0;
+                sourceName = currentSourceName;
+            }
+            fragment.setIndex(index++);
+        }
+    }
+
+    /**
+     * Converts hosts to their matching IP addresses.
+     *
+     * @throws UnknownHostException if converting host name to IP fails
+     */
+    private static void convertHostsToIPs(List<Fragment> fragments)
+            throws UnknownHostException {
+        /* host converted to IP map. Used to limit network calls. */
+        HashMap<String, String> hostToIpMap = new HashMap<String, String>();
+
+        for (Fragment fragment : fragments) {
+            String[] hosts = fragment.getReplicas();
+            if (hosts == null) {
+                continue;
+            }
+            String[] ips = new String[hosts.length];
+            int index = 0;
+
+            for (String host : hosts) {
+                String convertedIp = hostToIpMap.get(host);
+                if (convertedIp == null) {
+                    /* find host's IP, and add to map */
+                    InetAddress addr = InetAddress.getByName(host);
+                    convertedIp = addr.getHostAddress();
+                    hostToIpMap.put(host, convertedIp);
+                }
+
+                /* update IPs array */
+                ips[index] = convertedIp;
+                ++index;
+            }
+            fragment.setReplicas(ips);
+        }
+    }
+
+    /*
+     * Converts a fragments list to a readable string and prints it to the log.
+     * Intended for debugging purposes only. 'datapath' is the data path part of
+     * the original URI (e.g., table name, *.csv, etc).
+     */
+    private static void printList(List<Fragment> fragments, String datapath) {
+        LOG.debug("List of " + (fragments.isEmpty() ? "no" : fragments.size())
+                + "fragments for \"" + datapath + "\"");
+
+        int i = 0;
+        for (Fragment fragment : fragments) {
+            StringBuilder result = new StringBuilder();
+            result.append("Fragment #").append(++i).append(": [").append(
+                    "Source: ").append(fragment.getSourceName()).append(
+                    ", Index: ").append(fragment.getIndex()).append(
+                    ", Replicas:");
+            for (String host : fragment.getReplicas()) {
+                result.append(" ").append(host);
+            }
+
+            result.append(", Metadata: ").append(
+                    new String(fragment.getMetadata()));
+
+            if (fragment.getUserData() != null) {
+                result.append(", User Data: ").append(
+                        new String(fragment.getUserData()));
+            }
+            result.append("] ");
+            LOG.debug(result);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java
new file mode 100644
index 0000000..e1c2eb4
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/GPDBWritableMapper.java
@@ -0,0 +1,135 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.service.io.GPDBWritable;
+import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException;
+
+/*
+ * Class for mapping GPDBWritable get functions to java types.
+ */
+public class GPDBWritableMapper {
+
+    private GPDBWritable gpdbWritable;
+    private int type;
+    private DataGetter getter = null;
+
+    public GPDBWritableMapper(GPDBWritable gpdbWritable) {
+        this.gpdbWritable = gpdbWritable;
+    }
+
+    public void setDataType(int type) throws UnsupportedTypeException {
+        this.type = type;
+
+        switch (DataType.get(type)) {
+            case BOOLEAN:
+                getter = new BooleanDataGetter();
+                break;
+            case BYTEA:
+                getter = new BytesDataGetter();
+                break;
+            case BIGINT:
+                getter = new LongDataGetter();
+                break;
+            case SMALLINT:
+                getter = new ShortDataGetter();
+                break;
+            case INTEGER:
+                getter = new IntDataGetter();
+                break;
+            case TEXT:
+                getter = new StringDataGetter();
+                break;
+            case REAL:
+                getter = new FloatDataGetter();
+                break;
+            case FLOAT8:
+                getter = new DoubleDataGetter();
+                break;
+            default:
+                throw new UnsupportedTypeException(
+                        "Type " + GPDBWritable.getTypeName(type) +
+                                " is not supported by GPDBWritable");
+        }
+    }
+
+    public Object getData(int colIdx) throws TypeMismatchException {
+        return getter.getData(colIdx);
+    }
+
+    private interface DataGetter {
+        abstract Object getData(int colIdx) throws TypeMismatchException;
+    }
+
+    private class BooleanDataGetter implements DataGetter {
+        public Object getData(int colIdx) throws TypeMismatchException {
+            return gpdbWritable.getBoolean(colIdx);
+        }
+    }
+
+    private class BytesDataGetter implements DataGetter {
+        public Object getData(int colIdx) throws TypeMismatchException {
+            return gpdbWritable.getBytes(colIdx);
+        }
+    }
+
+    private class DoubleDataGetter implements DataGetter {
+        public Object getData(int colIdx) throws TypeMismatchException {
+            return gpdbWritable.getDouble(colIdx);
+        }
+    }
+
+    private class FloatDataGetter implements DataGetter {
+        public Object getData(int colIdx) throws TypeMismatchException {
+            return gpdbWritable.getFloat(colIdx);
+        }
+    }
+
+    private class IntDataGetter implements DataGetter {
+        public Object getData(int colIdx) throws TypeMismatchException {
+            return gpdbWritable.getInt(colIdx);
+        }
+    }
+
+    private class LongDataGetter implements DataGetter {
+        public Object getData(int colIdx) throws TypeMismatchException {
+            return gpdbWritable.getLong(colIdx);
+        }
+    }
+
+    private class ShortDataGetter implements DataGetter {
+        public Object getData(int colIdx) throws TypeMismatchException {
+            return gpdbWritable.getShort(colIdx);
+        }
+    }
+
+    private class StringDataGetter implements DataGetter {
+        public Object getData(int colIdx) throws TypeMismatchException {
+            return gpdbWritable.getString(colIdx);
+        }
+    }
+
+    public String toString() {
+        return "getter type = " + GPDBWritable.getTypeName(type);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java
new file mode 100644
index 0000000..396b711
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataFetcherFactory.java
@@ -0,0 +1,36 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.MetadataFetcher;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+
+/**
+ * Factory class for creation of {@link MetadataFetcher} objects. 
+ * The actual {@link MetadataFetcher} object is "hidden" behind an {@link MetadataFetcher} 
+ * abstract class which is returned by the MetadataFetcherFactory. 
+ */
+public class MetadataFetcherFactory {
+    public static MetadataFetcher create(InputData inputData) throws Exception {
+        return (MetadataFetcher) Utilities.createAnyInstance(InputData.class, inputData.getMetadata(), inputData);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java
new file mode 100644
index 0000000..741e201
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponse.java
@@ -0,0 +1,93 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import java.util.List;
+
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.Metadata;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+
+/**
+ * Class for serializing metadata in JSON format. The class implements
+ * {@link StreamingOutput} so the serialization will be done in a stream and not
+ * in one bulk, this in order to avoid running out of memory when processing a
+ * lot of items.
+ */
+public class MetadataResponse implements StreamingOutput {
+
+    private static final Log Log = LogFactory.getLog(MetadataResponse.class);
+    private static final String METADATA_DEFAULT_RESPONSE = "{\"PXFMetadata\":[]}";
+
+    private List<Metadata> metadataList;
+
+    /**
+     * Constructs metadata response out of a metadata list
+     *
+     * @param metadataList metadata list
+     */
+    public MetadataResponse(List<Metadata> metadataList) {
+        this.metadataList = metadataList;
+    }
+
+    /**
+     * Serializes the metadata list in JSON, To be used as the result string for HAWQ.
+     */
+    @Override
+    public void write(OutputStream output) throws IOException {
+        DataOutputStream dos = new DataOutputStream(output);
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.configure(org.codehaus.jackson.map.SerializationConfig.Feature.USE_ANNOTATIONS, true); // enable annotations for serialization
+        mapper.setSerializationInclusion(Inclusion.NON_EMPTY); // ignore empty fields
+
+        if(metadataList == null || metadataList.isEmpty()) {
+            dos.write(METADATA_DEFAULT_RESPONSE.getBytes());
+            return;
+        }
+
+        dos.write("{\"PXFMetadata\":[".getBytes());
+
+        String prefix = "";
+        for (Metadata metadata : metadataList) {
+            if(metadata == null) {
+                throw new IllegalArgumentException("metadata object is null - cannot serialize");
+            }
+            if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) {
+                throw new IllegalArgumentException("metadata for " + metadata.getItem() + " contains no fields - cannot serialize");
+            }
+            StringBuilder result = new StringBuilder();
+            result.append(prefix).append(mapper.writeValueAsString(metadata));
+            prefix = ",";
+            dos.write(result.toString().getBytes());
+        }
+
+        dos.write("]}".getBytes());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
new file mode 100644
index 0000000..8225ec5
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
@@ -0,0 +1,95 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+import org.apache.hawq.pxf.api.Metadata;
+
+/**
+ * Utility class for converting {@link Metadata} into a JSON format.
+ */
+public class MetadataResponseFormatter {
+
+    private static final Log LOG = LogFactory.getLog(MetadataResponseFormatter.class);
+
+    /**
+     * Converts list of {@link Metadata} to JSON String format.
+     *
+     * @param metadataList list of metadata objects to convert
+     * @param path path string
+     * @return JSON formatted response
+     * @throws IOException if converting the data to JSON fails
+     */
+    public static MetadataResponse formatResponse(List<Metadata> metadataList, String path) throws IOException {
+        /* print the fragment list to log when in debug level */
+        if (LOG.isDebugEnabled()) {
+            MetadataResponseFormatter.printMetadata(metadataList, path);
+        }
+
+        return new MetadataResponse(metadataList);
+    }
+
+    /**
+     * Converts metadata list to a readable string.
+     * Intended for debugging purposes only.
+     */
+    private static void printMetadata(List<Metadata> metadataList, String path) {
+        LOG.debug("Metadata List for path " + path + ": ");
+
+        if (null == metadataList || metadataList.isEmpty()) {
+            LOG.debug("No metadata");
+            return;
+        }
+
+        for(Metadata metadata: metadataList) {
+            StringBuilder result = new StringBuilder();
+
+            if (metadata == null) {
+                result.append("None");
+                LOG.debug(result);
+                continue;
+            }
+
+            result.append("Metadata for item \"").append(metadata.getItem()).append("\": ");
+
+            if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) {
+                result.append("None");
+            } else {
+                int i = 0;
+                for (Metadata.Field field : metadata.getFields()) {
+                    result.append("Field #").append(++i).append(": [")
+                            .append("Name: ").append(field.getName())
+                            .append(", Type: ").append(field.getType().getTypeName())
+                            .append(", Source type: ").append(field.getSourceType()).append("] ");
+                }
+            }
+            LOG.debug(result);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java
new file mode 100644
index 0000000..01a95ab
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java
@@ -0,0 +1,179 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.nio.charset.CharacterCodingException;
+import java.util.LinkedList;
+import java.util.zip.ZipException;
+
+/**
+ * ReadBridge class creates appropriate accessor and resolver. It will then
+ * create the correct output conversion class (e.g. Text or GPDBWritable) and
+ * get records from accessor, let resolver deserialize them and reserialize them
+ * using the output conversion class. <br>
+ * The class handles BadRecordException and other exception type and marks the
+ * record as invalid for HAWQ.
+ */
+public class ReadBridge implements Bridge {
+    ReadAccessor fileAccessor = null;
+    ReadResolver fieldsResolver = null;
+    BridgeOutputBuilder outputBuilder = null;
+    LinkedList<Writable> outputQueue = null;
+
+    private static final Log LOG = LogFactory.getLog(ReadBridge.class);
+
+    /**
+     * C'tor - set the implementation of the bridge.
+     *
+     * @param protData input containing accessor and resolver names
+     * @throws Exception if accessor or resolver can't be instantiated
+     */
+    public ReadBridge(ProtocolData protData) throws Exception {
+        outputBuilder = new BridgeOutputBuilder(protData);
+        outputQueue = new LinkedList<Writable>();
+        fileAccessor = getFileAccessor(protData);
+        fieldsResolver = getFieldsResolver(protData);
+    }
+
+    /**
+     * Accesses the underlying HDFS file.
+     */
+    @Override
+    public boolean beginIteration() throws Exception {
+        return fileAccessor.openForRead();
+    }
+
+    /**
+     * Fetches next object from file and turn it into a record that the HAWQ
+     * backend can process.
+     */
+    @Override
+    public Writable getNext() throws Exception {
+        Writable output = null;
+        OneRow onerow = null;
+
+        if (!outputQueue.isEmpty()) {
+            return outputQueue.pop();
+        }
+
+        try {
+            while (outputQueue.isEmpty()) {
+                onerow = fileAccessor.readNextObject();
+                if (onerow == null) {
+                    fileAccessor.closeForRead();
+                    output = outputBuilder.getPartialLine();
+                    if (output != null) {
+                        LOG.warn("A partial record in the end of the fragment");
+                    }
+                    // if there is a partial line, return it now, otherwise it
+                    // will return null
+                    return output;
+                }
+
+                // we checked before that outputQueue is empty, so we can
+                // override it.
+                outputQueue = outputBuilder.makeOutput(fieldsResolver.getFields(onerow));
+                if (!outputQueue.isEmpty()) {
+                    output = outputQueue.pop();
+                    break;
+                }
+            }
+        } catch (IOException ex) {
+            if (!isDataException(ex)) {
+                fileAccessor.closeForRead();
+                throw ex;
+            }
+            output = outputBuilder.getErrorOutput(ex);
+        } catch (BadRecordException ex) {
+            String row_info = "null";
+            if (onerow != null) {
+                row_info = onerow.toString();
+            }
+            if (ex.getCause() != null) {
+                LOG.debug("BadRecordException " + ex.getCause().toString()
+                        + ": " + row_info);
+            } else {
+                LOG.debug(ex.toString() + ": " + row_info);
+            }
+            output = outputBuilder.getErrorOutput(ex);
+        } catch (Exception ex) {
+            fileAccessor.closeForRead();
+            throw ex;
+        }
+
+        return output;
+    }
+
+    public static ReadAccessor getFileAccessor(InputData inputData)
+            throws Exception {
+        return (ReadAccessor) Utilities.createAnyInstance(InputData.class,
+                inputData.getAccessor(), inputData);
+    }
+
+    public static ReadResolver getFieldsResolver(InputData inputData)
+            throws Exception {
+        return (ReadResolver) Utilities.createAnyInstance(InputData.class,
+                inputData.getResolver(), inputData);
+    }
+
+    /*
+     * There are many exceptions that inherit IOException. Some of them like
+     * EOFException are generated due to a data problem, and not because of an
+     * IO/connection problem as the father IOException might lead us to believe.
+     * For example, an EOFException will be thrown while fetching a record from
+     * a sequence file, if there is a formatting problem in the record. Fetching
+     * record from the sequence-file is the responsibility of the accessor so
+     * the exception will be thrown from the accessor. We identify this cases by
+     * analyzing the exception type, and when we discover that the actual
+     * problem was a data problem, we return the errorOutput GPDBWritable.
+     */
+    private boolean isDataException(IOException ex) {
+        return (ex instanceof EOFException
+                || ex instanceof CharacterCodingException
+                || ex instanceof CharConversionException
+                || ex instanceof UTFDataFormatException || ex instanceof ZipException);
+    }
+
+    @Override
+    public boolean setNext(DataInputStream inputStream) {
+        throw new UnsupportedOperationException("setNext is not implemented");
+    }
+
+    @Override
+    public boolean isThreadSafe() {
+        boolean result = ((Plugin) fileAccessor).isThreadSafe()
+                && ((Plugin) fieldsResolver).isThreadSafe();
+        LOG.debug("Bridge is " + (result ? "" : "not ") + "thread safe");
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java
new file mode 100644
index 0000000..d5ae66a
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java
@@ -0,0 +1,131 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataInputStream;
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+/**
+ * ReadSamplingBridge wraps a ReadBridge, and returns only some of the output
+ * records, based on a ratio sample. The sample to pass or discard a record is
+ * done after all of the processing is completed (
+ * {@code accessor -> resolver -> output builder}) to make sure there are no
+ * chunks of data instead of single records. <br>
+ * The goal is to get as uniform as possible sampling. This is achieved by
+ * creating a bit map matching the precision of the sampleRatio, so that for a
+ * ratio of 0.034, a bit-map of 1000 bits will be created, and 34 bits will be
+ * set. This map is matched against each read record, discarding ones with a 0
+ * bit and continuing until a 1 bit record is read.
+ */
+public class ReadSamplingBridge implements Bridge {
+
+    ReadBridge bridge;
+
+    float sampleRatio;
+    BitSet sampleBitSet;
+    int bitSetSize;
+    int sampleSize;
+    int curIndex;
+
+    private static final Log LOG = LogFactory.getLog(ReadSamplingBridge.class);
+
+    /**
+     * C'tor - set the implementation of the bridge.
+     *
+     * @param protData input containing sampling ratio
+     * @throws Exception if the sampling ratio is wrong
+     */
+    public ReadSamplingBridge(ProtocolData protData) throws Exception {
+        bridge = new ReadBridge(protData);
+
+        this.sampleRatio = protData.getStatsSampleRatio();
+        if (sampleRatio < 0.0001 || sampleRatio > 1.0) {
+            throw new IllegalArgumentException(
+                    "sampling ratio must be a value between 0.0001 and 1.0. "
+                            + "(value = " + sampleRatio + ")");
+        }
+
+        calculateBitSetSize();
+
+        this.sampleBitSet = AnalyzeUtils.generateSamplingBitSet(bitSetSize,
+                sampleSize);
+        this.curIndex = 0;
+    }
+
+    private void calculateBitSetSize() {
+
+        sampleSize = (int) (sampleRatio * 10000);
+        bitSetSize = 10000;
+
+        while ((bitSetSize > 100) && (sampleSize % 10 == 0)) {
+            bitSetSize /= 10;
+            sampleSize /= 10;
+        }
+        LOG.debug("bit set size = " + bitSetSize + " sample size = "
+                + sampleSize);
+    }
+
+    /**
+     * Fetches next sample, according to the sampling ratio.
+     */
+    @Override
+    public Writable getNext() throws Exception {
+        Writable output = bridge.getNext();
+
+        // sample - if bit is false, advance to the next object
+        while (!sampleBitSet.get(curIndex)) {
+
+            if (output == null) {
+                break;
+            }
+            incIndex();
+            output = bridge.getNext();
+        }
+
+        incIndex();
+        return output;
+    }
+
+    private void incIndex() {
+        curIndex = (++curIndex) % bitSetSize;
+    }
+
+    @Override
+    public boolean beginIteration() throws Exception {
+        return bridge.beginIteration();
+    }
+
+    @Override
+    public boolean setNext(DataInputStream inputStream) throws Exception {
+        return bridge.setNext(inputStream);
+    }
+
+    @Override
+    public boolean isThreadSafe() {
+        return bridge.isThreadSafe();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e2416f49/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java
new file mode 100644
index 0000000..c3ee731
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java
@@ -0,0 +1,117 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.*;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInputStream;
+import java.util.List;
+
+/*
+ * WriteBridge class creates appropriate accessor and resolver.
+ * It reads data from inputStream by the resolver,
+ * and writes it to the Hadoop storage with the accessor.
+ */
+public class WriteBridge implements Bridge {
+    private static final Log LOG = LogFactory.getLog(WriteBridge.class);
+    WriteAccessor fileAccessor = null;
+    WriteResolver fieldsResolver = null;
+    BridgeInputBuilder inputBuilder;
+
+    /*
+     * C'tor - set the implementation of the bridge
+     */
+    public WriteBridge(ProtocolData protocolData) throws Exception {
+
+        inputBuilder = new BridgeInputBuilder(protocolData);
+        /* plugins accept InputData parameters */
+        fileAccessor = getFileAccessor(protocolData);
+        fieldsResolver = getFieldsResolver(protocolData);
+
+    }
+
+    /*
+     * Accesses the underlying HDFS file
+     */
+    @Override
+    public boolean beginIteration() throws Exception {
+        return fileAccessor.openForWrite();
+    }
+
+    /*
+     * Read data from stream, convert it using WriteResolver into OneRow object, and
+     * pass to WriteAccessor to write into file.
+     */
+    @Override
+    public boolean setNext(DataInputStream inputStream) throws Exception {
+
+        List<OneField> record = inputBuilder.makeInput(inputStream);
+        if (record == null) {
+            close();
+            return false;
+        }
+
+        OneRow onerow = fieldsResolver.setFields(record);
+        if (onerow == null) {
+            close();
+            return false;
+        }
+        if (!fileAccessor.writeNextObject(onerow)) {
+            close();
+            throw new BadRecordException();
+        }
+        return true;
+    }
+
+    private void close() throws Exception {
+        try {
+            fileAccessor.closeForWrite();
+        } catch (Exception e) {
+            LOG.error("Failed to close bridge resources: " + e.getMessage());
+            throw e;
+        }
+    }
+
+    private static WriteAccessor getFileAccessor(InputData inputData) throws Exception {
+        return (WriteAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData);
+    }
+
+    private static WriteResolver getFieldsResolver(InputData inputData) throws Exception {
+        return (WriteResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData);
+    }
+
+    @Override
+    public Writable getNext() {
+        throw new UnsupportedOperationException("getNext is not implemented");
+    }
+
+    @Override
+    public boolean isThreadSafe() {
+        return ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe();
+    }
+}