You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by es...@apache.org on 2017/02/03 09:00:47 UTC

[45/50] [abbrv] incubator-hawq git commit: HAWQ-1228. Use profile based on file format in HCatalog integration(HiveRC, HiveText profiles).

HAWQ-1228. Use profile based on file format in HCatalog integration(HiveRC, HiveText profiles).


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/6fa1ced2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/6fa1ced2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/6fa1ced2

Branch: refs/heads/2.1.0.0-incubating
Commit: 6fa1ced20e8bb2820b73e6904f77c4b4a1ed6de2
Parents: aac8868
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Mon Jan 30 23:38:06 2017 -0800
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Mon Jan 30 23:38:50 2017 -0800

----------------------------------------------------------------------
 pxf/gradle.properties                           |   2 +-
 .../java/org/apache/hawq/pxf/api/Metadata.java  |  51 +++-
 .../org/apache/hawq/pxf/api/OutputFormat.java   |  37 ++-
 .../hawq/pxf/api/utilities/InputData.java       |   1 +
 .../org/apache/hawq/pxf/api/MetadataTest.java   |   2 +-
 .../hawq/pxf/plugins/hive/HiveAccessor.java     |  14 +-
 .../plugins/hive/HiveColumnarSerdeResolver.java |  60 ++---
 .../pxf/plugins/hive/HiveDataFragmenter.java    |  28 +-
 .../plugins/hive/HiveInputFormatFragmenter.java |  41 ---
 .../pxf/plugins/hive/HiveLineBreakAccessor.java |  10 +-
 .../pxf/plugins/hive/HiveMetadataFetcher.java   |  85 +++---
 .../hawq/pxf/plugins/hive/HiveORCAccessor.java  |   9 +-
 .../pxf/plugins/hive/HiveORCSerdeResolver.java  |  44 +---
 .../pxf/plugins/hive/HiveRCFileAccessor.java    |  10 +-
 .../hawq/pxf/plugins/hive/HiveResolver.java     | 107 ++++----
 .../plugins/hive/HiveStringPassResolver.java    |  39 ++-
 .../hawq/pxf/plugins/hive/HiveUserData.java     | 135 ++++++++++
 .../hive/utilities/EnumHiveToHawqType.java      |  31 ++-
 .../plugins/hive/utilities/HiveUtilities.java   | 263 +++++++++++++++----
 .../plugins/hive/utilities/ProfileFactory.java  |  61 +++++
 .../plugins/hive/HiveMetadataFetcherTest.java   |   3 +
 .../pxf/plugins/hive/HiveORCAccessorTest.java   |   9 +-
 .../hive/utilities/HiveUtilitiesTest.java       |  53 ++++
 .../hive/utilities/ProfileFactoryTest.java      |  65 +++++
 .../hawq/pxf/service/BridgeOutputBuilder.java   |   8 +-
 .../pxf/service/MetadataResponseFormatter.java  |   3 +-
 .../apache/hawq/pxf/service/ProfileFactory.java |  45 ----
 .../hawq/pxf/service/rest/MetadataResource.java |   9 +-
 .../hawq/pxf/service/rest/VersionResource.java  |   2 +-
 .../pxf/service/utilities/ProtocolData.java     |  22 +-
 .../src/main/resources/pxf-profiles-default.xml |  14 +-
 .../service/MetadataResponseFormatterTest.java  |  16 +-
 src/backend/access/external/fileam.c            |   3 +
 src/backend/access/external/pxfheaders.c        |  21 +-
 .../access/external/test/pxfheaders_test.c      |  18 ++
 src/backend/catalog/external/externalmd.c       | 137 +++++++---
 src/bin/gpfusion/gpbridgeapi.c                  |   6 +-
 src/include/access/hd_work_mgr.h                |   2 +
 src/include/access/pxfheaders.h                 |   1 +
 src/include/access/pxfuriparser.h               |   2 +-
 src/include/catalog/external/itemmd.h           |   5 +
 src/include/catalog/pg_exttable.h               |  14 +-
 .../regress/data/hcatalog/single_table.json     |   2 +-
 .../data/hcatalog/single_table_text.json        |   1 +
 src/test/regress/input/json_load.source         |  12 +-
 src/test/regress/json_utils.c                   |  24 +-
 src/test/regress/output/json_load.source        |  35 ++-
 47 files changed, 1109 insertions(+), 453 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/gradle.properties
----------------------------------------------------------------------
diff --git a/pxf/gradle.properties b/pxf/gradle.properties
index b003c56..2af17ef 100644
--- a/pxf/gradle.properties
+++ b/pxf/gradle.properties
@@ -23,5 +23,5 @@ hiveVersion=1.2.1
 hbaseVersionJar=1.1.2
 hbaseVersionRPM=1.1.2
 tomcatVersion=7.0.62
-pxfProtocolVersion=v14
+pxfProtocolVersion=v15
 osFamily=el6

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
index 9e1c137..bb22d41 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
@@ -22,6 +22,8 @@ package org.apache.hawq.pxf.api;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hawq.pxf.api.utilities.EnumHawqType;
 import org.apache.commons.lang.StringUtils;
@@ -68,14 +70,16 @@ public class Metadata {
     }
 
     /**
-     * Class representing item field - name, type, source type, modifiers.
+     * Class representing item field - name, type, source type, is complex type?, modifiers.
      * Type - exposed type of field
      * Source type - type of field in underlying source
+     * Is complex type - whether source type is complex type
      * Modifiers - additional attributes which describe type or field
      */
     public static class Field {
         private String name;
         private EnumHawqType type; // field type which PXF exposes
+        private boolean isComplexType; // whether source field's type is complex
         private String sourceType; // field type PXF reads from
         private String[] modifiers; // type modifiers, optional field
 
@@ -91,12 +95,17 @@ public class Metadata {
             this.sourceType = sourceType;
         }
 
-        public Field(String name, EnumHawqType type, String sourceType,
-                String[] modifiers) {
+        public Field(String name, EnumHawqType type, String sourceType, String[] modifiers) {
             this(name, type, sourceType);
             this.modifiers = modifiers;
         }
 
+        public Field(String name, EnumHawqType type, boolean isComplexType, String sourceType, String[] modifiers) {
+            this(name, type, sourceType);
+            this.modifiers = modifiers;
+            this.isComplexType = isComplexType;
+        }
+
         public String getName() {
             return name;
         }
@@ -112,6 +121,14 @@ public class Metadata {
         public String[] getModifiers() {
             return modifiers;
         }
+
+        public boolean isComplexType() {
+            return isComplexType;
+        }
+
+        public void setComplexType(boolean isComplexType) {
+            this.isComplexType = isComplexType;
+        }
     }
 
     /**
@@ -123,6 +140,34 @@ public class Metadata {
      * Item's fields
      */
     private List<Metadata.Field> fields;
+    private Set<OutputFormat> outputFormats;
+    private Map<String, String> outputParameters;
+
+    /**
+     * Returns an item's output formats, @see OutputFormat.
+     *
+     * @return item's output formats
+     */
+    public Set<OutputFormat> getOutputFormats() {
+        return outputFormats;
+    }
+
+    public void setOutputFormats(Set<OutputFormat> outputFormats) {
+        this.outputFormats = outputFormats;
+    }
+
+    /**
+     * Returns an item's output parameters, for example - delimiters etc.
+     *
+     * @return item's output parameters
+     */
+    public Map<String, String> getOutputParameters() {
+        return outputParameters;
+    }
+
+    public void setOutputParameters(Map<String, String> outputParameters) {
+        this.outputParameters = outputParameters;
+    }
 
     /**
      * Constructs an item's Metadata.

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
index 230f9ff..565db13 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
@@ -21,6 +21,39 @@ package org.apache.hawq.pxf.api;
 
 
 /**
- * PXF supported output formats: {@link #TEXT} and {@link #BINARY}
+ * PXF supported output formats: {@link org.apache.hawq.pxf.service.io.Text} and {@link org.apache.hawq.pxf.service.io.GPDBWritable}
  */
-public enum OutputFormat {TEXT, BINARY}
+public enum OutputFormat {
+    TEXT("org.apache.hawq.pxf.service.io.Text"),
+    GPDBWritable("org.apache.hawq.pxf.service.io.GPDBWritable");
+
+    private String className;
+
+    OutputFormat(String className) {
+        this.className = className;
+    }
+
+    /**
+     * Returns a formats's implementation class name
+     *
+     * @return a formats's implementation class name
+     */
+    public String getClassName() {
+        return className;
+    }
+
+    /**
+     * Looks up output format for given class name if it exists.
+     *
+     * @throws UnsupportedTypeException if output format with given class wasn't found
+     * @return an output format with given class name
+     */
+    public static OutputFormat getOutputFormat(String className) {
+        for (OutputFormat of : values()) {
+            if (of.getClassName().equals(className)) {
+                return of;
+            }
+        }
+        throw new UnsupportedTypeException("Unable to find output format by given class name: " + className);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
index 5afedca..9816fdc 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
@@ -31,6 +31,7 @@ import java.util.*;
  */
 public class InputData {
 
+    public static final String DELIMITER_KEY = "DELIMITER";
     public static final int INVALID_SPLIT_IDX = -1;
     private static final Log LOG = LogFactory.getLog(InputData.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
index 327a15b..9244ba2 100644
--- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
+++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
@@ -32,7 +32,7 @@ public class MetadataTest {
     @Test
     public void createFieldEmptyNameType() {
         try {
-            Metadata.Field field = new Metadata.Field(null, null, null, null);
+            Metadata.Field field = new Metadata.Field(null, null, false, null, null);
             fail("Empty name, type and source type shouldn't be allowed.");
         } catch (IllegalArgumentException e) {
             assertEquals("Field name, type and source type cannot be empty", e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
index ef9f76e..ea3accb 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
@@ -28,6 +28,7 @@ import org.apache.hawq.pxf.api.UnsupportedTypeException;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.InputFormat;
@@ -42,10 +43,6 @@ import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
-import static org.apache.hawq.pxf.api.io.DataType.*;
-import static org.apache.hawq.pxf.api.io.DataType.BPCHAR;
-import static org.apache.hawq.pxf.api.io.DataType.BYTEA;
-
 /**
  * Accessor for Hive tables. The accessor will open and read a split belonging
  * to a Hive table. Opening a split means creating the corresponding InputFormat
@@ -138,12 +135,11 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
      */
     private InputFormat<?, ?> createInputFormat(InputData input)
             throws Exception {
-        String userData = new String(input.getFragmentUserData());
-        String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
-        initPartitionFields(toks[3]);
-        filterInFragmenter = new Boolean(toks[4]);
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
+        initPartitionFields(hiveUserData.getPartitionKeys());
+        filterInFragmenter = hiveUserData.isFilterInFragmenter();
         return HiveDataFragmenter.makeInputFormat(
-                toks[0]/* inputFormat name */, jobConf);
+                hiveUserData.getInputFormatName()/* inputFormat name */, jobConf);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
index 362ac0d..7d85efe 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -22,12 +22,15 @@ package org.apache.hawq.pxf.plugins.hive;
 import org.apache.hawq.pxf.api.BadRecordException;
 import org.apache.hawq.pxf.api.OneField;
 import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.OutputFormat;
 import org.apache.hawq.pxf.api.UnsupportedTypeException;
 import org.apache.hawq.pxf.api.io.DataType;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +43,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
-
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -57,11 +59,10 @@ import static org.apache.hawq.pxf.api.io.DataType.VARCHAR;
  */
 public class HiveColumnarSerdeResolver extends HiveResolver {
     private static final Log LOG = LogFactory.getLog(HiveColumnarSerdeResolver.class);
-    private ColumnarSerDeBase deserializer;
     private boolean firstColumn;
     private StringBuilder builder;
     private StringBuilder parts;
-    private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
+    private HiveUtilities.PXF_HIVE_SERDES serdeType;
 
     public HiveColumnarSerdeResolver(InputData input) throws Exception {
         super(input);
@@ -70,24 +71,22 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
     /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
     @Override
     void parseUserData(InputData input) throws Exception {
-        String[] toks = HiveInputFormatFragmenter.parseToks(input);
-        String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
-        if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE.name())) {
-            serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE;
-        } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name())) {
-            serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE;
-        }
-        else {
-            throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
-        }
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE, HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE);
+        String serdeClassName = hiveUserData.getSerdeClassName();
+
+        serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(serdeClassName);
         parts = new StringBuilder();
-        partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+        partitionKeys = hiveUserData.getPartitionKeys();
         parseDelimiterChar(input);
     }
 
     @Override
     void initPartitionFields() {
-        initPartitionFields(parts);
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+            initTextPartitionFields(parts);
+        } else {
+            super.initPartitionFields();
+        }
     }
 
     /**
@@ -97,15 +96,19 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
      */
     @Override
     public List<OneField> getFields(OneRow onerow) throws Exception {
-        firstColumn = true;
-        builder = new StringBuilder();
-        Object tuple = deserializer.deserialize((Writable) onerow.getData());
-        ObjectInspector oi = deserializer.getObjectInspector();
-
-        traverseTuple(tuple, oi);
-        /* We follow Hive convention. Partition fields are always added at the end of the record */
-        builder.append(parts);
-        return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString()));
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+            firstColumn = true;
+            builder = new StringBuilder();
+            Object tuple = deserializer.deserialize((Writable) onerow.getData());
+            ObjectInspector oi = deserializer.getObjectInspector();
+    
+            traverseTuple(tuple, oi);
+            /* We follow Hive convention. Partition fields are always added at the end of the record */
+            builder.append(parts);
+            return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString()));
+        } else {
+            return super.getFields(onerow);
+        }
     }
 
     /*
@@ -138,14 +141,7 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
         serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
         serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
 
-        if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE) {
-            deserializer = new ColumnarSerDe();
-        } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) {
-            deserializer = new LazyBinaryColumnarSerDe();
-        } else {
-            throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
-        }
-
+        deserializer = HiveUtilities.createDeserializer(serdeType, HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE, HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE);
         deserializer.initialize(new JobConf(new Configuration(), HiveColumnarSerdeResolver.class), serdeProperties);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
index 2d2b53e..a03d3b7 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
@@ -59,7 +59,7 @@ import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.ProfilesConf;
 import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
-import org.apache.hawq.pxf.service.ProfileFactory;
+import org.apache.hawq.pxf.plugins.hive.utilities.ProfileFactory;
 
 /**
  * Fragmenter class for HIVE tables. <br>
@@ -78,7 +78,6 @@ public class HiveDataFragmenter extends Fragmenter {
     private static final Log LOG = LogFactory.getLog(HiveDataFragmenter.class);
     private static final short ALL_PARTS = -1;
 
-    public static final String HIVE_UD_DELIM = "!HUDD!";
     public static final String HIVE_1_PART_DELIM = "!H1PD!";
     public static final String HIVE_PARTITIONS_DELIM = "!HPAD!";
     public static final String HIVE_NO_PART_TBL = "!HNPT!";
@@ -163,6 +162,10 @@ public class HiveDataFragmenter extends Fragmenter {
 
         Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
 
+        Metadata metadata = new Metadata(tblDesc);
+        HiveUtilities.getSchema(tbl, metadata);
+        boolean hasComplexTypes = HiveUtilities.hasComplexTypes(metadata);
+
         verifySchema(tbl);
 
         List<Partition> partitions = null;
@@ -228,7 +231,7 @@ public class HiveDataFragmenter extends Fragmenter {
 
         if (partitions.isEmpty()) {
             props = getSchema(tbl);
-            fetchMetaDataForSimpleTable(descTable, props);
+            fetchMetaDataForSimpleTable(descTable, props, hasComplexTypes);
         } else {
             List<FieldSchema> partitionKeys = tbl.getPartitionKeys();
 
@@ -239,7 +242,7 @@ public class HiveDataFragmenter extends Fragmenter {
                         tblDesc.getPath(), tblDesc.getName(),
                         partitionKeys);
                 fetchMetaDataForPartitionedTable(descPartition, props,
-                        partition, partitionKeys, tblDesc.getName());
+                        partition, partitionKeys, tblDesc.getName(), hasComplexTypes);
             }
         }
     }
@@ -255,29 +258,30 @@ public class HiveDataFragmenter extends Fragmenter {
     }
 
     private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc,
-                                             Properties props) throws Exception {
-        fetchMetaDataForSimpleTable(stdsc, props, null);
+                                             Properties props, boolean hasComplexTypes) throws Exception {
+        fetchMetaDataForSimpleTable(stdsc, props, null, hasComplexTypes);
     }
 
     private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc,
-                                             Properties props, String tableName)
+                                             Properties props, String tableName, boolean hasComplexTypes)
             throws Exception {
         fetchMetaData(new HiveTablePartition(stdsc, props, null, null,
-                tableName));
+                tableName), hasComplexTypes);
     }
 
     private void fetchMetaDataForPartitionedTable(StorageDescriptor stdsc,
                                                   Properties props,
                                                   Partition partition,
                                                   List<FieldSchema> partitionKeys,
-                                                  String tableName)
+                                                  String tableName,
+                                                  boolean hasComplexTypes)
             throws Exception {
         fetchMetaData(new HiveTablePartition(stdsc, props, partition,
-                partitionKeys, tableName));
+                partitionKeys, tableName), hasComplexTypes);
     }
 
     /* Fills a table partition */
-    private void fetchMetaData(HiveTablePartition tablePartition)
+    private void fetchMetaData(HiveTablePartition tablePartition, boolean hasComplexTypes)
             throws Exception {
         InputFormat<?, ?> fformat = makeInputFormat(
                 tablePartition.storageDesc.getInputFormat(), jobConf);
@@ -285,7 +289,7 @@ public class HiveDataFragmenter extends Fragmenter {
         if (inputData.getProfile() != null) {
             // evaluate optimal profile based on file format if profile was explicitly specified in url
             // if user passed accessor+fragmenter+resolver - use them
-            profile = ProfileFactory.get(fformat);
+            profile = ProfileFactory.get(fformat, hasComplexTypes);
         }
         String fragmenterForProfile = null;
         if (profile != null) {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
index ca4501b..9199118 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
@@ -55,11 +55,6 @@ import java.util.Properties;
  */
 public class HiveInputFormatFragmenter extends HiveDataFragmenter {
     private static final Log LOG = LogFactory.getLog(HiveInputFormatFragmenter.class);
-    private static final int EXPECTED_NUM_OF_TOKS = 4;
-    public static final int TOK_SERDE = 0;
-    public static final int TOK_KEYS = 1;
-    public static final int TOK_FILTER_DONE = 2;
-    public static final int TOK_COL_TYPES = 3;
 
     /** Defines the Hive input formats currently supported in pxf */
     public enum PXF_HIVE_INPUT_FORMATS {
@@ -68,14 +63,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
         ORC_FILE_INPUT_FORMAT
     }
 
-    /** Defines the Hive serializers (serde classes) currently supported in pxf */
-    public enum PXF_HIVE_SERDES {
-        COLUMNAR_SERDE,
-        LAZY_BINARY_COLUMNAR_SERDE,
-        LAZY_SIMPLE_SERDE,
-        ORC_SERDE
-    }
-
     /**
      * Constructs a HiveInputFormatFragmenter.
      *
@@ -85,34 +72,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
         super(inputData, HiveInputFormatFragmenter.class);
     }
 
-    /**
-     * Extracts the user data:
-     * serde, partition keys and whether filter was included in fragmenter
-     *
-     * @param input input data from client
-     * @param supportedSerdes supported serde names
-     * @return parsed tokens
-     * @throws UserDataException if user data contains unsupported serde
-     *                           or wrong number of tokens
-     */
-    static public String[] parseToks(InputData input, String... supportedSerdes)
-            throws UserDataException {
-        String userData = new String(input.getFragmentUserData());
-        String[] toks = userData.split(HIVE_UD_DELIM);
-        if (supportedSerdes.length > 0
-                && !Arrays.asList(supportedSerdes).contains(toks[TOK_SERDE])) {
-            throw new UserDataException(toks[TOK_SERDE]
-                    + " serializer isn't supported by " + input.getAccessor());
-        }
-
-        if (toks.length != (EXPECTED_NUM_OF_TOKS)) {
-            throw new UserDataException("HiveInputFormatFragmenter expected "
-                    + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
-        }
-
-        return toks;
-    }
-
     /*
      * Checks that hive fields and partitions match the HAWQ schema. Throws an
      * exception if: - the number of fields (+ partitions) do not match the HAWQ

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
index ed4f908..66680bb 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
@@ -21,12 +21,12 @@ package org.apache.hawq.pxf.plugins.hive;
 
 
 import org.apache.hawq.pxf.api.utilities.InputData;
-
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.hadoop.mapred.*;
 
 import java.io.IOException;
 
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 
 /**
  * Specialization of HiveAccessor for a Hive table stored as Text files.
@@ -43,9 +43,9 @@ public class HiveLineBreakAccessor extends HiveAccessor {
     public HiveLineBreakAccessor(InputData input) throws Exception {
         super(input, new TextInputFormat());
         ((TextInputFormat) inputFormat).configure(jobConf);
-        String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name());
-        initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
-        filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE);
+        initPartitionFields(hiveUserData.getPartitionKeys());
+        filterInFragmenter = hiveUserData.isFilterInFragmenter();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
index 91f91e7..dc76289 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
@@ -21,33 +21,49 @@ package org.apache.hawq.pxf.plugins.hive;
 
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
-
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hawq.pxf.api.Metadata;
 import org.apache.hawq.pxf.api.MetadataFetcher;
+import org.apache.hawq.pxf.api.OutputFormat;
 import org.apache.hawq.pxf.api.UnsupportedTypeException;
 import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.ProfilesConf;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.ProfileFactory;
 
 /**
  * Class for connecting to Hive's MetaStore and getting schema of Hive tables.
  */
 public class HiveMetadataFetcher extends MetadataFetcher {
 
+    private static final String DELIM_FIELD = InputData.DELIMITER_KEY;
+
     private static final Log LOG = LogFactory.getLog(HiveMetadataFetcher.class);
     private HiveMetaStoreClient client;
+    private JobConf jobConf;
 
     public HiveMetadataFetcher(InputData md) {
         super(md);
 
         // init hive metastore client connection.
         client = HiveUtilities.initHiveClient();
+        jobConf = new JobConf(new Configuration());
     }
 
     /**
@@ -82,8 +98,28 @@ public class HiveMetadataFetcher extends MetadataFetcher {
             try {
                 Metadata metadata = new Metadata(tblDesc);
                 Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
-                getSchema(tbl, metadata);
+                HiveUtilities.getSchema(tbl, metadata);
+                boolean hasComplexTypes = HiveUtilities.hasComplexTypes(metadata);
                 metadataList.add(metadata);
+                List<Partition> tablePartitions = client.listPartitionsByFilter(tblDesc.getPath(), tblDesc.getName(), "", (short) -1);
+                Set<OutputFormat> formats = new HashSet<OutputFormat>();
+                //If table has partitions - find out all formats
+                for (Partition tablePartition : tablePartitions) {
+                    String inputFormat = tablePartition.getSd().getInputFormat();
+                    OutputFormat outputFormat = getOutputFormat(inputFormat, hasComplexTypes);
+                    formats.add(outputFormat);
+                }
+                //If table has no partitions - get single format of table
+                if (tablePartitions.size() == 0 ) {
+                    String inputFormat = tbl.getSd().getInputFormat();
+                    OutputFormat outputFormat = getOutputFormat(inputFormat, hasComplexTypes);
+                    formats.add(outputFormat);
+                }
+                metadata.setOutputFormats(formats);
+                Map<String, String> outputParameters = new HashMap<String, String>();
+                Integer delimiterCode = HiveUtilities.getDelimiterCode(tbl.getSd());
+                outputParameters.put(DELIM_FIELD, delimiterCode.toString());
+                metadata.setOutputParameters(outputParameters);
             } catch (UnsupportedTypeException | UnsupportedOperationException e) {
                 if(ignoreErrors) {
                     LOG.warn("Metadata fetch for " + tblDesc.toString() + " failed. " + e.getMessage());
@@ -97,42 +133,13 @@ public class HiveMetadataFetcher extends MetadataFetcher {
         return metadataList;
     }
 
-
-    /**
-     * Populates the given metadata object with the given table's fields and partitions,
-     * The partition fields are added at the end of the table schema.
-     * Throws an exception if the table contains unsupported field types.
-     * Supported HCatalog types: TINYINT,
-     * SMALLINT, INT, BIGINT, BOOLEAN, FLOAT, DOUBLE, STRING, BINARY, TIMESTAMP,
-     * DATE, DECIMAL, VARCHAR, CHAR.
-     *
-     * @param tbl Hive table
-     * @param metadata schema of given table
-     */
-    private void getSchema(Table tbl, Metadata metadata) {
-
-        int hiveColumnsSize = tbl.getSd().getColsSize();
-        int hivePartitionsSize = tbl.getPartitionKeysSize();
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Hive table: " + hiveColumnsSize + " fields, " + hivePartitionsSize + " partitions.");
-        }
-
-        // check hive fields
-        try {
-            List<FieldSchema> hiveColumns = tbl.getSd().getCols();
-            for (FieldSchema hiveCol : hiveColumns) {
-                metadata.addField(HiveUtilities.mapHiveType(hiveCol));
-            }
-            // check partition fields
-            List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
-            for (FieldSchema hivePart : hivePartitions) {
-                metadata.addField(HiveUtilities.mapHiveType(hivePart));
-            }
-        } catch (UnsupportedTypeException e) {
-            String errorMsg = "Failed to retrieve metadata for table " + metadata.getItem() + ". " +
-                    e.getMessage();
-            throw new UnsupportedTypeException(errorMsg);
-        }
+    private OutputFormat getOutputFormat(String inputFormat, boolean hasComplexTypes) throws Exception {
+        OutputFormat outputFormat = null;
+        InputFormat<?, ?> fformat = HiveDataFragmenter.makeInputFormat(inputFormat, jobConf);
+        String profile = ProfileFactory.get(fformat, hasComplexTypes);
+        String outputFormatClassName = ProfilesConf.getProfilePluginsMap(profile).get("X-GP-OUTPUTFORMAT");
+        outputFormat = OutputFormat.getOutputFormat(outputFormatClassName);
+        return outputFormat;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index dc195f4..07348b0 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -30,6 +30,7 @@ import org.apache.hawq.pxf.api.BasicFilter;
 import org.apache.hawq.pxf.api.LogicalFilter;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.commons.lang.StringUtils;
 
 import java.sql.Date;
@@ -37,7 +38,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 
 /**
  * Specialization of HiveAccessor for a Hive table that stores only ORC files.
@@ -61,9 +62,9 @@ public class HiveORCAccessor extends HiveAccessor {
      */
     public HiveORCAccessor(InputData input) throws Exception {
         super(input, new OrcInputFormat());
-        String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.ORC_SERDE.name());
-        initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
-        filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE);
+        initPartitionFields(hiveUserData.getPartitionKeys());
+        filterInFragmenter = hiveUserData.isFilterInFragmenter();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
index 93aa474..fec0ff0 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
@@ -34,6 +34,7 @@ import org.apache.hawq.pxf.api.io.DataType;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 
 import java.util.*;
 
@@ -43,8 +44,7 @@ import java.util.*;
  */
 public class HiveORCSerdeResolver extends HiveResolver {
     private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class);
-    private OrcSerde deserializer;
-    private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
+    private HiveUtilities.PXF_HIVE_SERDES serdeType;
     private String typesString;
 
     public HiveORCSerdeResolver(InputData input) throws Exception {
@@ -54,41 +54,16 @@ public class HiveORCSerdeResolver extends HiveResolver {
     /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
     @Override
     void parseUserData(InputData input) throws Exception {
-        String[] toks = HiveInputFormatFragmenter.parseToks(input);
-        String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
-        if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE.name())) {
-            serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE;
-        } else {
-            throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
-        }
-        partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
-        typesString = toks[HiveInputFormatFragmenter.TOK_COL_TYPES];
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE);
+        serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName());
+        partitionKeys = hiveUserData.getPartitionKeys();
+        typesString = hiveUserData.getColTypes();
         collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
                 : input.getUserProperty("COLLECTION_DELIM");
         mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
                 : input.getUserProperty("MAPKEY_DELIM");
     }
 
-    /**
-     * getFields returns a singleton list of OneField item.
-     * OneField item contains two fields: an integer representing the VARCHAR type and a Java
-     * Object representing the field value.
-     */
-    @Override
-    public List<OneField> getFields(OneRow onerow) throws Exception {
-
-        Object tuple = deserializer.deserialize((Writable) onerow.getData());
-        // Each Hive record is a Struct
-        StructObjectInspector soi = (StructObjectInspector) deserializer.getObjectInspector();
-        List<OneField> record = traverseStruct(tuple, soi, false);
-
-        //Add partition fields if any
-        record.addAll(getPartitionFields());
-
-        return record;
-
-    }
-
     /*
      * Get and init the deserializer for the records of this Hive data fragment.
      * Suppress Warnings added because deserializer.initialize is an abstract function that is deprecated
@@ -127,12 +102,7 @@ public class HiveORCSerdeResolver extends HiveResolver {
         serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
         serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
 
-        if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE) {
-            deserializer = new OrcSerde();
-        } else {
-            throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
-        }
-
+        deserializer = HiveUtilities.createDeserializer(serdeType, HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE);
         deserializer.initialize(new JobConf(new Configuration(), HiveORCSerdeResolver.class), serdeProperties);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
index 2686851..7132d7b 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
@@ -21,7 +21,7 @@ package org.apache.hawq.pxf.plugins.hive;
 
 
 import org.apache.hawq.pxf.api.utilities.InputData;
-
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
 import org.apache.hadoop.mapred.FileSplit;
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.JobConf;
 
 import java.io.IOException;
 
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 
 /**
  * Specialization of HiveAccessor for a Hive table that stores only RC files.
@@ -47,9 +47,9 @@ public class HiveRCFileAccessor extends HiveAccessor {
      */
     public HiveRCFileAccessor(InputData input) throws Exception {
         super(input, new RCFileInputFormat());
-        String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name());
-        initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
-        filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.COLUMNAR_SERDE, PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE);
+        initPartitionFields(hiveUserData.getPartitionKeys());
+        filterInFragmenter = hiveUserData.isFilterInFragmenter();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
index 3837f78..5646969 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
@@ -27,6 +27,7 @@ import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Plugin;
 import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.commons.lang.CharUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -74,10 +75,10 @@ public class HiveResolver extends Plugin implements ReadResolver {
     protected static final String COLLECTION_DELIM = ",";
     protected String collectionDelim;
     protected String mapkeyDelim;
-    private SerDe deserializer;
+    protected SerDe deserializer;
     private List<OneField> partitionFields;
-    private String serdeName;
-    private String propsString;
+    protected String serdeClassName;
+    protected String propsString;
     String partitionKeys;
     protected char delimiter;
     String nullChar = "\\N";
@@ -133,19 +134,11 @@ public class HiveResolver extends Plugin implements ReadResolver {
 
     /* Parses user data string (arrived from fragmenter). */
     void parseUserData(InputData input) throws Exception {
-        final int EXPECTED_NUM_OF_TOKS = 5;
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
 
-        String userData = new String(input.getFragmentUserData());
-        String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
-
-        if (toks.length != EXPECTED_NUM_OF_TOKS) {
-            throw new UserDataException("HiveResolver expected "
-                    + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
-        }
-
-        serdeName = toks[1];
-        propsString = toks[2];
-        partitionKeys = toks[3];
+        serdeClassName = hiveUserData.getSerdeClassName();
+        propsString = hiveUserData.getPropertiesString();
+        partitionKeys = hiveUserData.getPartitionKeys();
 
         collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
                 : input.getUserProperty("COLLECTION_DELIM");
@@ -160,14 +153,16 @@ public class HiveResolver extends Plugin implements ReadResolver {
     void initSerde(InputData inputData) throws Exception {
         Properties serdeProperties;
 
-        Class<?> c = Class.forName(serdeName, true, JavaUtils.getClassLoader());
+        Class<?> c = Class.forName(serdeClassName, true, JavaUtils.getClassLoader());
         deserializer = (SerDe) c.newInstance();
         serdeProperties = new Properties();
-        ByteArrayInputStream inStream = new ByteArrayInputStream(
-                propsString.getBytes());
-        serdeProperties.load(inStream);
-        deserializer.initialize(new JobConf(conf, HiveResolver.class),
-                serdeProperties);
+        if (propsString != null ) {
+            ByteArrayInputStream inStream = new ByteArrayInputStream(propsString.getBytes());
+            serdeProperties.load(inStream);
+        } else {
+            throw new IllegalArgumentException("propsString is mandatory to initialize serde.");
+        }
+        deserializer.initialize(new JobConf(conf, HiveResolver.class), serdeProperties);
     }
 
     /*
@@ -271,7 +266,7 @@ public class HiveResolver extends Plugin implements ReadResolver {
      * The partition fields are initialized one time based on userData provided
      * by the fragmenter.
      */
-    void initPartitionFields(StringBuilder parts) {
+    void initTextPartitionFields(StringBuilder parts) {
         if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
             return;
         }
@@ -625,47 +620,49 @@ public class HiveResolver extends Plugin implements ReadResolver {
      */
     void parseDelimiterChar(InputData input) {
 
-        String userDelim = input.getUserProperty("DELIMITER");
+        String userDelim = input.getUserProperty(InputData.DELIMITER_KEY);
 
         if (userDelim == null) {
-            throw new IllegalArgumentException("DELIMITER is a required option");
-        }
-
-        final int VALID_LENGTH = 1;
-        final int VALID_LENGTH_HEX = 4;
-
-        if (userDelim.startsWith("\\x")) { // hexadecimal sequence
-
-            if (userDelim.length() != VALID_LENGTH_HEX) {
+            /* No DELIMITER in URL, try to get it from fragment's user data*/
+            HiveUserData hiveUserData = null;
+            try {
+                hiveUserData = HiveUtilities.parseHiveUserData(input);
+            } catch (UserDataException ude) {
+                throw new IllegalArgumentException("Couldn't parse user data to get " + InputData.DELIMITER_KEY);
+            }
+            if (hiveUserData.getDelimiter() == null) {
+                throw new IllegalArgumentException(InputData.DELIMITER_KEY + " is a required option");
+            }
+            delimiter = (char) Integer.valueOf(hiveUserData.getDelimiter()).intValue();
+        } else {
+            final int VALID_LENGTH = 1;
+            final int VALID_LENGTH_HEX = 4;
+            if (userDelim.startsWith("\\x")) { // hexadecimal sequence
+                if (userDelim.length() != VALID_LENGTH_HEX) {
+                    throw new IllegalArgumentException(
+                            "Invalid hexdecimal value for delimiter (got"
+                                    + userDelim + ")");
+                }
+                delimiter = (char) Integer.parseInt(
+                        userDelim.substring(2, VALID_LENGTH_HEX), 16);
+                if (!CharUtils.isAscii(delimiter)) {
+                    throw new IllegalArgumentException(
+                            "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
+                                    + delimiter + ")");
+                }
+                return;
+            }
+            if (userDelim.length() != VALID_LENGTH) {
                 throw new IllegalArgumentException(
-                        "Invalid hexdecimal value for delimiter (got"
+                        "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got "
                                 + userDelim + ")");
             }
-
-            delimiter = (char) Integer.parseInt(
-                    userDelim.substring(2, VALID_LENGTH_HEX), 16);
-
-            if (!CharUtils.isAscii(delimiter)) {
+            if (!CharUtils.isAscii(userDelim.charAt(0))) {
                 throw new IllegalArgumentException(
                         "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
-                                + delimiter + ")");
+                                + userDelim + ")");
             }
-
-            return;
-        }
-
-        if (userDelim.length() != VALID_LENGTH) {
-            throw new IllegalArgumentException(
-                    "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got "
-                            + userDelim + ")");
+            delimiter = userDelim.charAt(0);
         }
-
-        if (!CharUtils.isAscii(userDelim.charAt(0))) {
-            throw new IllegalArgumentException(
-                    "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
-                            + userDelim + ")");
-        }
-
-        delimiter = userDelim.charAt(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
index fdc5f69..76d5cad 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
@@ -22,7 +22,11 @@ package org.apache.hawq.pxf.plugins.hive;
 
 import org.apache.hawq.pxf.api.OneField;
 import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.OutputFormat;
+import org.apache.hawq.pxf.api.UserDataException;
 import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
 
 import java.util.Collections;
 import java.util.List;
@@ -42,21 +46,32 @@ public class HiveStringPassResolver extends HiveResolver {
 
     @Override
     void parseUserData(InputData input) throws Exception {
-        String userData = new String(input.getFragmentUserData());
-        String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
         parseDelimiterChar(input);
         parts = new StringBuilder();
-        partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+        partitionKeys = hiveUserData.getPartitionKeys();
+        serdeClassName = hiveUserData.getSerdeClassName();
+
+        /* Needed only for GPDBWritable format*/
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.GPDBWritable) {
+            propsString = hiveUserData.getPropertiesString();
+        }
     }
 
     @Override
-    void initSerde(InputData input) {
-        /* nothing to do here */
+    void initSerde(InputData input) throws Exception {
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.GPDBWritable) {
+            super.initSerde(input);
+        }
     }
 
     @Override
     void initPartitionFields() {
-        initPartitionFields(parts);
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+            initTextPartitionFields(parts);
+        } else {
+            super.initPartitionFields();
+        }
     }
 
     /**
@@ -66,9 +81,13 @@ public class HiveStringPassResolver extends HiveResolver {
      */
     @Override
     public List<OneField> getFields(OneRow onerow) throws Exception {
-        String line = (onerow.getData()).toString();
-
-        /* We follow Hive convention. Partition fields are always added at the end of the record */
-        return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+            String line = (onerow.getData()).toString();
+            /* We follow Hive convention. Partition fields are always added at the end of the record */
+            return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
+        } else {
+            return super.getFields(onerow);
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
new file mode 100644
index 0000000..e3632e0
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.plugins.hive;
+
+/**
+ * Class which is a carrier for user data in Hive fragment.
+ *
+ */
+public class HiveUserData {
+
+    public static final String HIVE_UD_DELIM = "!HUDD!";
+    private static final int EXPECTED_NUM_OF_TOKS = 7;
+
+    public HiveUserData(String inputFormatName, String serdeClassName,
+            String propertiesString, String partitionKeys,
+            boolean filterInFragmenter,
+            String delimiter,
+            String colTypes) {
+
+        this.inputFormatName = inputFormatName;
+        this.serdeClassName = serdeClassName;
+        this.propertiesString = propertiesString;
+        this.partitionKeys = partitionKeys;
+        this.filterInFragmenter = filterInFragmenter;
+        this.delimiter = (delimiter == null ? "0" : delimiter);
+        this.colTypes = colTypes;
+    }
+
+    /**
+     * Returns input format of a fragment
+     *
+     * @return input format of a fragment
+     */
+    public String getInputFormatName() {
+        return inputFormatName;
+    }
+
+    /**
+     * Returns SerDe class name
+     *
+     * @return SerDe class name
+     */
+    public String getSerdeClassName() {
+        return serdeClassName;
+    }
+
+    /**
+     * Returns properties string needed for SerDe initialization
+     *
+     * @return properties string needed for SerDe initialization
+     */
+    public String getPropertiesString() {
+        return propertiesString;
+    }
+
+    /**
+     * Returns partition keys
+     *
+     * @return partition keys
+     */
+    public String getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    /**
+     * Returns whether filtering was done in fragmenter
+     *
+     * @return true if filtering was done in fragmenter
+     */
+    public boolean isFilterInFragmenter() {
+        return filterInFragmenter;
+    }
+
+    /**
+     * Returns field delimiter
+     *
+     * @return field delimiter
+     */
+    public String getDelimiter() {
+        return delimiter;
+    }
+
+    public void setDelimiter(String delimiter) {
+        this.delimiter = delimiter;
+    }
+
+    private String inputFormatName;
+    private String serdeClassName;
+    private String propertiesString;
+    private String partitionKeys;
+    private boolean filterInFragmenter;
+    private String delimiter;
+    private String colTypes;
+
+    /**
+     * The method returns expected number of tokens in raw user data
+     *
+     * @return number of tokens in raw user data
+     */
+    public static int getNumOfTokens() {
+        return EXPECTED_NUM_OF_TOKS;
+    }
+
+    @Override
+    public String toString() {
+        return inputFormatName + HiveUserData.HIVE_UD_DELIM
+                + serdeClassName + HiveUserData.HIVE_UD_DELIM
+                + propertiesString + HiveUserData.HIVE_UD_DELIM
+                + partitionKeys + HiveUserData.HIVE_UD_DELIM
+                + filterInFragmenter + HiveUserData.HIVE_UD_DELIM
+                + delimiter + HiveUserData.HIVE_UD_DELIM
+                + colTypes;
+    }
+
+    public String getColTypes() {
+        return colTypes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
index d91e949..ea65a66 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
@@ -42,37 +42,48 @@ public enum EnumHiveToHawqType {
     FloatType("float", EnumHawqType.Float4Type),
     DoubleType("double", EnumHawqType.Float8Type),
     StringType("string", EnumHawqType.TextType),
-    BinaryType("binary", EnumHawqType.ByteaType),
+    BinaryType("binary", EnumHawqType.ByteaType, true),
     TimestampType("timestamp", EnumHawqType.TimestampType),
     DateType("date", EnumHawqType.DateType),
     DecimalType("decimal", EnumHawqType.NumericType, "[(,)]"),
     VarcharType("varchar", EnumHawqType.VarcharType, "[(,)]"),
     CharType("char", EnumHawqType.BpcharType, "[(,)]"),
-    ArrayType("array", EnumHawqType.TextType, "[<,>]"),
-    MapType("map", EnumHawqType.TextType, "[<,>]"),
-    StructType("struct", EnumHawqType.TextType, "[<,>]"),
-    UnionType("uniontype", EnumHawqType.TextType, "[<,>]");
+    ArrayType("array", EnumHawqType.TextType, "[<,>]", true),
+    MapType("map", EnumHawqType.TextType, "[<,>]", true),
+    StructType("struct", EnumHawqType.TextType, "[<,>]", true),
+    UnionType("uniontype", EnumHawqType.TextType, "[<,>]", true);
 
     private String typeName;
     private EnumHawqType hawqType;
     private String splitExpression;
     private byte size;
+    private boolean isComplexType;
 
     EnumHiveToHawqType(String typeName, EnumHawqType hawqType) {
         this.typeName = typeName;
         this.hawqType = hawqType;
     }
-    
+
     EnumHiveToHawqType(String typeName, EnumHawqType hawqType, byte size) {
         this(typeName, hawqType);
         this.size = size;
     }
 
+    EnumHiveToHawqType(String typeName, EnumHawqType hawqType, boolean isComplexType) {
+        this(typeName, hawqType);
+        this.isComplexType = isComplexType;
+    }
+
     EnumHiveToHawqType(String typeName, EnumHawqType hawqType, String splitExpression) {
         this(typeName, hawqType);
         this.splitExpression = splitExpression;
     }
 
+    EnumHiveToHawqType(String typeName, EnumHawqType hawqType, String splitExpression, boolean isComplexType) {
+        this(typeName, hawqType, splitExpression);
+        this.isComplexType = isComplexType;
+    }
+
     /**
      * 
      * @return name of type
@@ -216,4 +227,12 @@ public enum EnumHiveToHawqType {
         return size;
     }
 
+    public boolean isComplexType() {
+        return isComplexType;
+    }
+
+    public void setComplexType(boolean isComplexType) {
+        this.isComplexType = isComplexType;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index f7ebf4d..37f4ac2 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -35,17 +35,28 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.*;
 import org.apache.hawq.pxf.api.Fragmenter;
 import org.apache.hawq.pxf.api.Metadata;
+import org.apache.hawq.pxf.api.Metadata.Field;
 import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.UserDataException;
 import org.apache.hawq.pxf.api.utilities.EnumHawqType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hawq.pxf.plugins.hive.HiveDataFragmenter;
 import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter;
 import org.apache.hawq.pxf.plugins.hive.HiveTablePartition;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS;
-import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import org.apache.hawq.pxf.plugins.hive.HiveUserData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 
 /**
  * Class containing helper functions connecting
@@ -53,6 +64,46 @@ import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDE
  */
 public class HiveUtilities {
 
+    /** Defines the Hive serializers (serde classes) currently supported in pxf */
+    public enum PXF_HIVE_SERDES {
+        COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"),
+        LAZY_BINARY_COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"),
+        LAZY_SIMPLE_SERDE("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
+        ORC_SERDE("org.apache.hadoop.hive.ql.io.orc.OrcSerde");
+
+        private String serdeClassName;
+
+        PXF_HIVE_SERDES(String serdeClassName) {
+            this.serdeClassName = serdeClassName;
+        }
+
+        /**
+         * Method which looks up serde by serde class name.
+         *
+         * @param serdeClassName input serde name
+         * @param allowedSerdes all serdes which allowed in current context
+         * @return serde by given serde class name and list of allowed serdes
+         * @throws UnsupportedTypeException if unable to find serde by class name, or found serde which is not allowed in current context
+         */
+        public static PXF_HIVE_SERDES getPxfHiveSerde(String serdeClassName, PXF_HIVE_SERDES... allowedSerdes) {
+            for (PXF_HIVE_SERDES s : values()) {
+                if (s.getSerdeClassName().equals(serdeClassName)) {
+
+                    if (allowedSerdes.length > 0
+                            && !Arrays.asList(allowedSerdes).contains(s)) {
+                        throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeClassName);
+                    }
+                    return s;
+                }
+            }
+            throw new UnsupportedTypeException("Unable to find serde for class name: "+ serdeClassName);
+        }
+
+        public String getSerdeClassName() {
+            return serdeClassName;
+        }
+    }
+
     private static final Log LOG = LogFactory.getLog(HiveUtilities.class);
     private static final String WILDCARD = "*";
 
@@ -64,10 +115,7 @@ public class HiveUtilities {
     static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
     static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
     static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
-    static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
-    static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe";
-    static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
-    static final String STR_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
+    private static final int DEFAULT_DELIMITER_CODE = 44;
 
     /**
      * Initializes the HiveMetaStoreClient
@@ -162,7 +210,7 @@ public class HiveUtilities {
         } else
             hiveTypeName = hiveType;
 
-        return new Metadata.Field(fieldName, hawqType, hiveTypeName, modifiers);
+        return new Metadata.Field(fieldName, hawqType, hiveToHawqType.isComplexType(), hiveTypeName, modifiers);
     }
 
     /**
@@ -376,31 +424,6 @@ public class HiveUtilities {
         }
     }
 
-    /*
-     * Validates that partition serde corresponds to PXF supported serdes and
-     * transforms the class name to an enumeration for writing it to the
-     * resolvers on other PXF instances.
-     */
-    private static String assertSerde(String className, HiveTablePartition partData)
-            throws Exception {
-        switch (className) {
-            case STR_COLUMNAR_SERDE:
-                return PXF_HIVE_SERDES.COLUMNAR_SERDE.name();
-            case STR_LAZY_BINARY_COLUMNAR_SERDE:
-                return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name();
-            case STR_LAZY_SIMPLE_SERDE:
-                return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name();
-            case STR_ORC_SERDE:
-                return PXF_HIVE_SERDES.ORC_SERDE.name();
-            default:
-                throw new UnsupportedTypeException(
-                        "HiveInputFormatFragmenter does not yet support  "
-                                + className + " for " + partData
-                                + ". Supported serializers are: "
-                                + Arrays.toString(PXF_HIVE_SERDES.values()));
-        }
-    }
-
 
     /* Turns the partition keys into a string */
     public static String serializePartitionKeys(HiveTablePartition partData) throws Exception {
@@ -429,10 +452,19 @@ public class HiveUtilities {
         return partitionKeys.toString();
     }
 
+    /**
+     * The method which serializes fragment-related attributes, needed for reading and resolution to string
+     *
+     * @param fragmenterClassName
+     * @param partData
+     * @param filterInFragmenter
+     * @return serialized representation of fragment-related attributes
+     * @throws Exception
+     */
     @SuppressWarnings("unchecked")
     public static byte[] makeUserData(String fragmenterClassName, HiveTablePartition partData, boolean filterInFragmenter) throws Exception {
 
-        String userData = null;
+        HiveUserData hiveUserData = null;
 
         if (fragmenterClassName == null) {
             throw new IllegalArgumentException("No fragmenter provided.");
@@ -440,25 +472,158 @@ public class HiveUtilities {
 
         Class fragmenterClass = Class.forName(fragmenterClassName);
 
+        String inputFormatName = partData.storageDesc.getInputFormat();
+        String serdeClassName = partData.storageDesc.getSerdeInfo().getSerializationLib();
+        String propertiesString = serializeProperties(partData.properties);
+        String partitionKeys = serializePartitionKeys(partData);
+        String delimiter = getDelimiterCode(partData.storageDesc).toString();
+        String colTypes = partData.properties.getProperty("columns.types");
+
         if (HiveInputFormatFragmenter.class.isAssignableFrom(fragmenterClass)) {
-            String inputFormatName = partData.storageDesc.getInputFormat();
-            String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
-            String partitionKeys = serializePartitionKeys(partData);
-            String colTypes = partData.properties.getProperty("columns.types");
             assertFileType(inputFormatName, partData);
-            userData = assertSerde(serdeName, partData) + HiveDataFragmenter.HIVE_UD_DELIM
-                    + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter + HiveDataFragmenter.HIVE_UD_DELIM + colTypes;
-        } else if (HiveDataFragmenter.class.isAssignableFrom(fragmenterClass)){
-            String inputFormatName = partData.storageDesc.getInputFormat();
-            String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
-            String propertiesString = serializeProperties(partData.properties);
-            String partitionKeys = serializePartitionKeys(partData);
-            userData = inputFormatName + HiveDataFragmenter.HIVE_UD_DELIM + serdeName
-                    + HiveDataFragmenter.HIVE_UD_DELIM + propertiesString + HiveDataFragmenter.HIVE_UD_DELIM
-                    + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter;
-        } else {
-            throw new IllegalArgumentException("HiveUtilities#makeUserData is not implemented for " + fragmenterClassName);
         }
-        return userData.getBytes();
+
+        hiveUserData = new HiveUserData(inputFormatName, serdeClassName, propertiesString, partitionKeys, filterInFragmenter, delimiter, colTypes);
+
+        return hiveUserData.toString().getBytes();
+    }
+
+    /**
+     * The method parses raw user data into HiveUserData class
+     *
+     * @param input input data
+     * @param supportedSerdes list of allowed serdes in current context
+     * @return instance of HiveUserData class
+     * @throws UserDataException
+     */
+    public static HiveUserData parseHiveUserData(InputData input, PXF_HIVE_SERDES... supportedSerdes) throws UserDataException{
+        String userData = new String(input.getFragmentUserData());
+        String[] toks = userData.split(HiveUserData.HIVE_UD_DELIM, HiveUserData.getNumOfTokens());
+
+        if (toks.length != (HiveUserData.getNumOfTokens())) {
+            throw new UserDataException("HiveInputFormatFragmenter expected "
+                    + HiveUserData.getNumOfTokens() + " tokens, but got " + toks.length);
+        }
+
+        HiveUserData hiveUserData = new HiveUserData(toks[0], toks[1], toks[2], toks[3], Boolean.valueOf(toks[4]), toks[5], toks[6]);
+
+        if (supportedSerdes.length > 0) {
+            /* Make sure this serde is supported */
+            PXF_HIVE_SERDES pxfHiveSerde = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName(), supportedSerdes);
+        }
+
+        return hiveUserData;
+    }
+
+    private static String getSerdeParameter(StorageDescriptor sd, String parameterKey) {
+        String parameterValue = null;
+        if (sd != null && sd.getSerdeInfo() != null && sd.getSerdeInfo().getParameters() != null && sd.getSerdeInfo().getParameters().get(parameterKey) != null) {
+            parameterValue = sd.getSerdeInfo().getParameters().get(parameterKey);
+        }
+
+        return parameterValue;
+    }
+
+    /**
+     * The method which extracts field delimiter from storage descriptor.
+     * When unable to extract delimiter from storage descriptor, default value is used
+     *
+     * @param sd StorageDescriptor of table/partition
+     * @return ASCII code of delimiter
+     */
+    public static Integer getDelimiterCode(StorageDescriptor sd) {
+        Integer delimiterCode = null;
+
+        String delimiter = getSerdeParameter(sd, serdeConstants.FIELD_DELIM);
+        if (delimiter != null) {
+            delimiterCode = (int) delimiter.charAt(0);
+            return delimiterCode;
+        }
+
+        delimiter = getSerdeParameter(sd, serdeConstants.SERIALIZATION_FORMAT);
+        if (delimiter != null) {
+            delimiterCode = Integer.parseInt(delimiter);
+            return delimiterCode;
+        }
+
+        return DEFAULT_DELIMITER_CODE;
+    }
+
+    /**
+     * The method determines whether metadata definition has any complex type
+     * @see EnumHiveToHawqType for complex type attribute definition
+     *
+     * @param metadata metadata of relation
+     * @return true if metadata has at least one field of complex type
+     */
+    public static boolean hasComplexTypes(Metadata metadata) {
+        boolean hasComplexTypes = false;
+        List<Field> fields = metadata.getFields();
+        for (Field field: fields) {
+            if (field.isComplexType()) {
+                hasComplexTypes = true;
+                break;
+            }
+        }
+
+        return hasComplexTypes;
+    }
+
+    /**
+     * Populates the given metadata object with the given table's fields and partitions,
+     * The partition fields are added at the end of the table schema.
+     * Throws an exception if the table contains unsupported field types.
+     * Supported HCatalog types: TINYINT,
+     * SMALLINT, INT, BIGINT, BOOLEAN, FLOAT, DOUBLE, STRING, BINARY, TIMESTAMP,
+     * DATE, DECIMAL, VARCHAR, CHAR.
+     *
+     * @param tbl Hive table
+     * @param metadata schema of given table
+     */
+    public static void getSchema(Table tbl, Metadata metadata) {
+
+        int hiveColumnsSize = tbl.getSd().getColsSize();
+        int hivePartitionsSize = tbl.getPartitionKeysSize();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Hive table: " + hiveColumnsSize + " fields, " + hivePartitionsSize + " partitions.");
+        }
+
+        // check hive fields
+        try {
+            List<FieldSchema> hiveColumns = tbl.getSd().getCols();
+            for (FieldSchema hiveCol : hiveColumns) {
+                metadata.addField(HiveUtilities.mapHiveType(hiveCol));
+            }
+            // check partition fields
+            List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
+            for (FieldSchema hivePart : hivePartitions) {
+                metadata.addField(HiveUtilities.mapHiveType(hivePart));
+            }
+        } catch (UnsupportedTypeException e) {
+            String errorMsg = "Failed to retrieve metadata for table " + metadata.getItem() + ". " +
+                    e.getMessage();
+            throw new UnsupportedTypeException(errorMsg);
+        }
+    }
+
+    /**
+     * Creates an instance of a given serde type
+     *
+     * @param serdeType
+     * @param allowedSerdes
+     * @return instance of a given serde
+     * @throws UnsupportedTypeException if given serde is not allowed in current context
+     */
+    @SuppressWarnings("deprecation")
+    public static SerDe createDeserializer(PXF_HIVE_SERDES serdeType, PXF_HIVE_SERDES... allowedSerdes) throws Exception{
+        SerDe deserializer = null;
+        if (!Arrays.asList(allowedSerdes).contains(serdeType)) {
+            throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name());
+        }
+
+        deserializer = (SerDe) Utilities.createAnyInstance(serdeType.getSerdeClassName());
+
+        return deserializer;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
new file mode 100644
index 0000000..f36f074
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
@@ -0,0 +1,61 @@
+package org.apache.hawq.pxf.plugins.hive.utilities;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hawq.pxf.api.Metadata;
+
+/**
+ * Factory class which returns optimal profile for given input format
+ *
+ */
+public class ProfileFactory {
+
+    private static final String HIVE_TEXT_PROFILE = "HiveText";
+    private static final String HIVE_RC_PROFILE = "HiveRC";
+    private static final String HIVE_ORC_PROFILE = "HiveORC";
+    private static final String HIVE_PROFILE = "Hive";
+
+    /**
+     * The method which returns optimal profile
+     *
+     * @param inputFormat input format of table/partition
+     * @param hasComplexTypes whether record has complex types, see @EnumHiveToHawqType
+     * @return name of optimal profile
+     */
+    public static String get(InputFormat inputFormat, boolean hasComplexTypes) {
+        String profileName = null;
+        if (inputFormat instanceof TextInputFormat && !hasComplexTypes) {
+            profileName = HIVE_TEXT_PROFILE;
+        } else if (inputFormat instanceof RCFileInputFormat) {
+            profileName = HIVE_RC_PROFILE;
+        } else if (inputFormat instanceof OrcInputFormat) {
+            profileName = HIVE_ORC_PROFILE;
+        } else {
+            //Default case
+            profileName = HIVE_PROFILE;
+        }
+        return profileName;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java
index d9d97fc..6e40f9a 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java
@@ -132,6 +132,7 @@ public class HiveMetadataFetcherTest {
         fields.add(new FieldSchema("field2", "int", null));
         StorageDescriptor sd = new StorageDescriptor();
         sd.setCols(fields);
+        sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
         Table hiveTable = new Table();
         hiveTable.setTableType("MANAGED_TABLE");
         hiveTable.setSd(sd);
@@ -176,6 +177,7 @@ public class HiveMetadataFetcherTest {
         fields.add(new FieldSchema("field2", "int", null));
         StorageDescriptor sd = new StorageDescriptor();
         sd.setCols(fields);
+        sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
 
         // Mock hive tables returned from hive client
         for(int index=1;index<=2;index++) {
@@ -235,6 +237,7 @@ public class HiveMetadataFetcherTest {
         fields.add(new FieldSchema("field2", "int", null));
         StorageDescriptor sd = new StorageDescriptor();
         sd.setCols(fields);
+        sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
         Table hiveTable2 = new Table();
         hiveTable2.setTableType("MANAGED_TABLE");
         hiveTable2.setSd(sd);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
index 7bbe811..8b4bf13 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.mapred.*;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,7 +45,7 @@ import static org.mockito.Mockito.when;
 
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({HiveORCAccessor.class, HiveInputFormatFragmenter.class, HdfsUtilities.class, HiveDataFragmenter.class})
+@PrepareForTest({HiveORCAccessor.class, HiveUtilities.class, HdfsUtilities.class, HiveDataFragmenter.class})
 @SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf",
         "org.apache.hadoop.hive.metastore.api.MetaException",
         "org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities"}) // Prevents static inits
@@ -61,8 +63,9 @@ public class HiveORCAccessorTest {
         jobConf = new JobConf();
         PowerMockito.whenNew(JobConf.class).withAnyArguments().thenReturn(jobConf);
 
-        PowerMockito.mockStatic(HiveInputFormatFragmenter.class);
-        PowerMockito.when(HiveInputFormatFragmenter.parseToks(any(InputData.class), any(String[].class))).thenReturn(new String[]{"", HiveDataFragmenter.HIVE_NO_PART_TBL, "true"});
+        PowerMockito.mockStatic(HiveUtilities.class);
+        PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true, "1", ""));
+
         PowerMockito.mockStatic(HdfsUtilities.class);
 
         PowerMockito.mockStatic(HiveDataFragmenter.class);