You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by es...@apache.org on 2017/02/03 09:00:47 UTC
[45/50] [abbrv] incubator-hawq git commit: HAWQ-1228. Use profile
based on file format in HCatalog integration(HiveRC, HiveText profiles).
HAWQ-1228. Use profile based on file format in HCatalog integration(HiveRC, HiveText profiles).
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/6fa1ced2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/6fa1ced2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/6fa1ced2
Branch: refs/heads/2.1.0.0-incubating
Commit: 6fa1ced20e8bb2820b73e6904f77c4b4a1ed6de2
Parents: aac8868
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Mon Jan 30 23:38:06 2017 -0800
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Mon Jan 30 23:38:50 2017 -0800
----------------------------------------------------------------------
pxf/gradle.properties | 2 +-
.../java/org/apache/hawq/pxf/api/Metadata.java | 51 +++-
.../org/apache/hawq/pxf/api/OutputFormat.java | 37 ++-
.../hawq/pxf/api/utilities/InputData.java | 1 +
.../org/apache/hawq/pxf/api/MetadataTest.java | 2 +-
.../hawq/pxf/plugins/hive/HiveAccessor.java | 14 +-
.../plugins/hive/HiveColumnarSerdeResolver.java | 60 ++---
.../pxf/plugins/hive/HiveDataFragmenter.java | 28 +-
.../plugins/hive/HiveInputFormatFragmenter.java | 41 ---
.../pxf/plugins/hive/HiveLineBreakAccessor.java | 10 +-
.../pxf/plugins/hive/HiveMetadataFetcher.java | 85 +++---
.../hawq/pxf/plugins/hive/HiveORCAccessor.java | 9 +-
.../pxf/plugins/hive/HiveORCSerdeResolver.java | 44 +---
.../pxf/plugins/hive/HiveRCFileAccessor.java | 10 +-
.../hawq/pxf/plugins/hive/HiveResolver.java | 107 ++++----
.../plugins/hive/HiveStringPassResolver.java | 39 ++-
.../hawq/pxf/plugins/hive/HiveUserData.java | 135 ++++++++++
.../hive/utilities/EnumHiveToHawqType.java | 31 ++-
.../plugins/hive/utilities/HiveUtilities.java | 263 +++++++++++++++----
.../plugins/hive/utilities/ProfileFactory.java | 61 +++++
.../plugins/hive/HiveMetadataFetcherTest.java | 3 +
.../pxf/plugins/hive/HiveORCAccessorTest.java | 9 +-
.../hive/utilities/HiveUtilitiesTest.java | 53 ++++
.../hive/utilities/ProfileFactoryTest.java | 65 +++++
.../hawq/pxf/service/BridgeOutputBuilder.java | 8 +-
.../pxf/service/MetadataResponseFormatter.java | 3 +-
.../apache/hawq/pxf/service/ProfileFactory.java | 45 ----
.../hawq/pxf/service/rest/MetadataResource.java | 9 +-
.../hawq/pxf/service/rest/VersionResource.java | 2 +-
.../pxf/service/utilities/ProtocolData.java | 22 +-
.../src/main/resources/pxf-profiles-default.xml | 14 +-
.../service/MetadataResponseFormatterTest.java | 16 +-
src/backend/access/external/fileam.c | 3 +
src/backend/access/external/pxfheaders.c | 21 +-
.../access/external/test/pxfheaders_test.c | 18 ++
src/backend/catalog/external/externalmd.c | 137 +++++++---
src/bin/gpfusion/gpbridgeapi.c | 6 +-
src/include/access/hd_work_mgr.h | 2 +
src/include/access/pxfheaders.h | 1 +
src/include/access/pxfuriparser.h | 2 +-
src/include/catalog/external/itemmd.h | 5 +
src/include/catalog/pg_exttable.h | 14 +-
.../regress/data/hcatalog/single_table.json | 2 +-
.../data/hcatalog/single_table_text.json | 1 +
src/test/regress/input/json_load.source | 12 +-
src/test/regress/json_utils.c | 24 +-
src/test/regress/output/json_load.source | 35 ++-
47 files changed, 1109 insertions(+), 453 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/gradle.properties
----------------------------------------------------------------------
diff --git a/pxf/gradle.properties b/pxf/gradle.properties
index b003c56..2af17ef 100644
--- a/pxf/gradle.properties
+++ b/pxf/gradle.properties
@@ -23,5 +23,5 @@ hiveVersion=1.2.1
hbaseVersionJar=1.1.2
hbaseVersionRPM=1.1.2
tomcatVersion=7.0.62
-pxfProtocolVersion=v14
+pxfProtocolVersion=v15
osFamily=el6
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
index 9e1c137..bb22d41 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
@@ -22,6 +22,8 @@ package org.apache.hawq.pxf.api;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.hawq.pxf.api.utilities.EnumHawqType;
import org.apache.commons.lang.StringUtils;
@@ -68,14 +70,16 @@ public class Metadata {
}
/**
- * Class representing item field - name, type, source type, modifiers.
+ * Class representing item field - name, type, source type, is complex type?, modifiers.
* Type - exposed type of field
* Source type - type of field in underlying source
+ * Is complex type - whether source type is complex type
* Modifiers - additional attributes which describe type or field
*/
public static class Field {
private String name;
private EnumHawqType type; // field type which PXF exposes
+ private boolean isComplexType; // whether source field's type is complex
private String sourceType; // field type PXF reads from
private String[] modifiers; // type modifiers, optional field
@@ -91,12 +95,17 @@ public class Metadata {
this.sourceType = sourceType;
}
- public Field(String name, EnumHawqType type, String sourceType,
- String[] modifiers) {
+ public Field(String name, EnumHawqType type, String sourceType, String[] modifiers) {
this(name, type, sourceType);
this.modifiers = modifiers;
}
+ public Field(String name, EnumHawqType type, boolean isComplexType, String sourceType, String[] modifiers) {
+ this(name, type, sourceType);
+ this.modifiers = modifiers;
+ this.isComplexType = isComplexType;
+ }
+
public String getName() {
return name;
}
@@ -112,6 +121,14 @@ public class Metadata {
public String[] getModifiers() {
return modifiers;
}
+
+ public boolean isComplexType() {
+ return isComplexType;
+ }
+
+ public void setComplexType(boolean isComplexType) {
+ this.isComplexType = isComplexType;
+ }
}
/**
@@ -123,6 +140,34 @@ public class Metadata {
* Item's fields
*/
private List<Metadata.Field> fields;
+ private Set<OutputFormat> outputFormats;
+ private Map<String, String> outputParameters;
+
+ /**
+ * Returns an item's output formats, @see OutputFormat.
+ *
+ * @return item's output formats
+ */
+ public Set<OutputFormat> getOutputFormats() {
+ return outputFormats;
+ }
+
+ public void setOutputFormats(Set<OutputFormat> outputFormats) {
+ this.outputFormats = outputFormats;
+ }
+
+ /**
+ * Returns an item's output parameters, for example - delimiters etc.
+ *
+ * @return item's output parameters
+ */
+ public Map<String, String> getOutputParameters() {
+ return outputParameters;
+ }
+
+ public void setOutputParameters(Map<String, String> outputParameters) {
+ this.outputParameters = outputParameters;
+ }
/**
* Constructs an item's Metadata.
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
index 230f9ff..565db13 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
@@ -21,6 +21,39 @@ package org.apache.hawq.pxf.api;
/**
- * PXF supported output formats: {@link #TEXT} and {@link #BINARY}
+ * PXF supported output formats: {@link org.apache.hawq.pxf.service.io.Text} and {@link org.apache.hawq.pxf.service.io.GPDBWritable}
*/
-public enum OutputFormat {TEXT, BINARY}
+public enum OutputFormat {
+ TEXT("org.apache.hawq.pxf.service.io.Text"),
+ GPDBWritable("org.apache.hawq.pxf.service.io.GPDBWritable");
+
+ private String className;
+
+ OutputFormat(String className) {
+ this.className = className;
+ }
+
+ /**
+ * Returns a formats's implementation class name
+ *
+ * @return a formats's implementation class name
+ */
+ public String getClassName() {
+ return className;
+ }
+
+ /**
+ * Looks up output format for given class name if it exists.
+ *
+ * @throws UnsupportedTypeException if output format with given class wasn't found
+ * @return an output format with given class name
+ */
+ public static OutputFormat getOutputFormat(String className) {
+ for (OutputFormat of : values()) {
+ if (of.getClassName().equals(className)) {
+ return of;
+ }
+ }
+ throw new UnsupportedTypeException("Unable to find output format by given class name: " + className);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
index 5afedca..9816fdc 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
@@ -31,6 +31,7 @@ import java.util.*;
*/
public class InputData {
+ public static final String DELIMITER_KEY = "DELIMITER";
public static final int INVALID_SPLIT_IDX = -1;
private static final Log LOG = LogFactory.getLog(InputData.class);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
index 327a15b..9244ba2 100644
--- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
+++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
@@ -32,7 +32,7 @@ public class MetadataTest {
@Test
public void createFieldEmptyNameType() {
try {
- Metadata.Field field = new Metadata.Field(null, null, null, null);
+ Metadata.Field field = new Metadata.Field(null, null, false, null, null);
fail("Empty name, type and source type shouldn't be allowed.");
} catch (IllegalArgumentException e) {
assertEquals("Field name, type and source type cannot be empty", e.getMessage());
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
index ef9f76e..ea3accb 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
@@ -28,6 +28,7 @@ import org.apache.hawq.pxf.api.UnsupportedTypeException;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.InputFormat;
@@ -42,10 +43,6 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
-import static org.apache.hawq.pxf.api.io.DataType.*;
-import static org.apache.hawq.pxf.api.io.DataType.BPCHAR;
-import static org.apache.hawq.pxf.api.io.DataType.BYTEA;
-
/**
* Accessor for Hive tables. The accessor will open and read a split belonging
* to a Hive table. Opening a split means creating the corresponding InputFormat
@@ -138,12 +135,11 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
*/
private InputFormat<?, ?> createInputFormat(InputData input)
throws Exception {
- String userData = new String(input.getFragmentUserData());
- String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
- initPartitionFields(toks[3]);
- filterInFragmenter = new Boolean(toks[4]);
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
+ initPartitionFields(hiveUserData.getPartitionKeys());
+ filterInFragmenter = hiveUserData.isFilterInFragmenter();
return HiveDataFragmenter.makeInputFormat(
- toks[0]/* inputFormat name */, jobConf);
+ hiveUserData.getInputFormatName()/* inputFormat name */, jobConf);
}
/*
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
index 362ac0d..7d85efe 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -22,12 +22,15 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.hawq.pxf.api.BadRecordException;
import org.apache.hawq.pxf.api.OneField;
import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.OutputFormat;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
import org.apache.hawq.pxf.api.io.DataType;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,7 +43,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
-
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
@@ -57,11 +59,10 @@ import static org.apache.hawq.pxf.api.io.DataType.VARCHAR;
*/
public class HiveColumnarSerdeResolver extends HiveResolver {
private static final Log LOG = LogFactory.getLog(HiveColumnarSerdeResolver.class);
- private ColumnarSerDeBase deserializer;
private boolean firstColumn;
private StringBuilder builder;
private StringBuilder parts;
- private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
+ private HiveUtilities.PXF_HIVE_SERDES serdeType;
public HiveColumnarSerdeResolver(InputData input) throws Exception {
super(input);
@@ -70,24 +71,22 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
/* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
@Override
void parseUserData(InputData input) throws Exception {
- String[] toks = HiveInputFormatFragmenter.parseToks(input);
- String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
- if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE.name())) {
- serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE;
- } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name())) {
- serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE;
- }
- else {
- throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
- }
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE, HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE);
+ String serdeClassName = hiveUserData.getSerdeClassName();
+
+ serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(serdeClassName);
parts = new StringBuilder();
- partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+ partitionKeys = hiveUserData.getPartitionKeys();
parseDelimiterChar(input);
}
@Override
void initPartitionFields() {
- initPartitionFields(parts);
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+ initTextPartitionFields(parts);
+ } else {
+ super.initPartitionFields();
+ }
}
/**
@@ -97,15 +96,19 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
*/
@Override
public List<OneField> getFields(OneRow onerow) throws Exception {
- firstColumn = true;
- builder = new StringBuilder();
- Object tuple = deserializer.deserialize((Writable) onerow.getData());
- ObjectInspector oi = deserializer.getObjectInspector();
-
- traverseTuple(tuple, oi);
- /* We follow Hive convention. Partition fields are always added at the end of the record */
- builder.append(parts);
- return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString()));
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+ firstColumn = true;
+ builder = new StringBuilder();
+ Object tuple = deserializer.deserialize((Writable) onerow.getData());
+ ObjectInspector oi = deserializer.getObjectInspector();
+
+ traverseTuple(tuple, oi);
+ /* We follow Hive convention. Partition fields are always added at the end of the record */
+ builder.append(parts);
+ return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString()));
+ } else {
+ return super.getFields(onerow);
+ }
}
/*
@@ -138,14 +141,7 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
- if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE) {
- deserializer = new ColumnarSerDe();
- } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) {
- deserializer = new LazyBinaryColumnarSerDe();
- } else {
- throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
- }
-
+ deserializer = HiveUtilities.createDeserializer(serdeType, HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE, HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE);
deserializer.initialize(new JobConf(new Configuration(), HiveColumnarSerdeResolver.class), serdeProperties);
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
index 2d2b53e..a03d3b7 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
@@ -59,7 +59,7 @@ import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.ProfilesConf;
import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
-import org.apache.hawq.pxf.service.ProfileFactory;
+import org.apache.hawq.pxf.plugins.hive.utilities.ProfileFactory;
/**
* Fragmenter class for HIVE tables. <br>
@@ -78,7 +78,6 @@ public class HiveDataFragmenter extends Fragmenter {
private static final Log LOG = LogFactory.getLog(HiveDataFragmenter.class);
private static final short ALL_PARTS = -1;
- public static final String HIVE_UD_DELIM = "!HUDD!";
public static final String HIVE_1_PART_DELIM = "!H1PD!";
public static final String HIVE_PARTITIONS_DELIM = "!HPAD!";
public static final String HIVE_NO_PART_TBL = "!HNPT!";
@@ -163,6 +162,10 @@ public class HiveDataFragmenter extends Fragmenter {
Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
+ Metadata metadata = new Metadata(tblDesc);
+ HiveUtilities.getSchema(tbl, metadata);
+ boolean hasComplexTypes = HiveUtilities.hasComplexTypes(metadata);
+
verifySchema(tbl);
List<Partition> partitions = null;
@@ -228,7 +231,7 @@ public class HiveDataFragmenter extends Fragmenter {
if (partitions.isEmpty()) {
props = getSchema(tbl);
- fetchMetaDataForSimpleTable(descTable, props);
+ fetchMetaDataForSimpleTable(descTable, props, hasComplexTypes);
} else {
List<FieldSchema> partitionKeys = tbl.getPartitionKeys();
@@ -239,7 +242,7 @@ public class HiveDataFragmenter extends Fragmenter {
tblDesc.getPath(), tblDesc.getName(),
partitionKeys);
fetchMetaDataForPartitionedTable(descPartition, props,
- partition, partitionKeys, tblDesc.getName());
+ partition, partitionKeys, tblDesc.getName(), hasComplexTypes);
}
}
}
@@ -255,29 +258,30 @@ public class HiveDataFragmenter extends Fragmenter {
}
private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc,
- Properties props) throws Exception {
- fetchMetaDataForSimpleTable(stdsc, props, null);
+ Properties props, boolean hasComplexTypes) throws Exception {
+ fetchMetaDataForSimpleTable(stdsc, props, null, hasComplexTypes);
}
private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc,
- Properties props, String tableName)
+ Properties props, String tableName, boolean hasComplexTypes)
throws Exception {
fetchMetaData(new HiveTablePartition(stdsc, props, null, null,
- tableName));
+ tableName), hasComplexTypes);
}
private void fetchMetaDataForPartitionedTable(StorageDescriptor stdsc,
Properties props,
Partition partition,
List<FieldSchema> partitionKeys,
- String tableName)
+ String tableName,
+ boolean hasComplexTypes)
throws Exception {
fetchMetaData(new HiveTablePartition(stdsc, props, partition,
- partitionKeys, tableName));
+ partitionKeys, tableName), hasComplexTypes);
}
/* Fills a table partition */
- private void fetchMetaData(HiveTablePartition tablePartition)
+ private void fetchMetaData(HiveTablePartition tablePartition, boolean hasComplexTypes)
throws Exception {
InputFormat<?, ?> fformat = makeInputFormat(
tablePartition.storageDesc.getInputFormat(), jobConf);
@@ -285,7 +289,7 @@ public class HiveDataFragmenter extends Fragmenter {
if (inputData.getProfile() != null) {
// evaluate optimal profile based on file format if profile was explicitly specified in url
// if user passed accessor+fragmenter+resolver - use them
- profile = ProfileFactory.get(fformat);
+ profile = ProfileFactory.get(fformat, hasComplexTypes);
}
String fragmenterForProfile = null;
if (profile != null) {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
index ca4501b..9199118 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
@@ -55,11 +55,6 @@ import java.util.Properties;
*/
public class HiveInputFormatFragmenter extends HiveDataFragmenter {
private static final Log LOG = LogFactory.getLog(HiveInputFormatFragmenter.class);
- private static final int EXPECTED_NUM_OF_TOKS = 4;
- public static final int TOK_SERDE = 0;
- public static final int TOK_KEYS = 1;
- public static final int TOK_FILTER_DONE = 2;
- public static final int TOK_COL_TYPES = 3;
/** Defines the Hive input formats currently supported in pxf */
public enum PXF_HIVE_INPUT_FORMATS {
@@ -68,14 +63,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
ORC_FILE_INPUT_FORMAT
}
- /** Defines the Hive serializers (serde classes) currently supported in pxf */
- public enum PXF_HIVE_SERDES {
- COLUMNAR_SERDE,
- LAZY_BINARY_COLUMNAR_SERDE,
- LAZY_SIMPLE_SERDE,
- ORC_SERDE
- }
-
/**
* Constructs a HiveInputFormatFragmenter.
*
@@ -85,34 +72,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
super(inputData, HiveInputFormatFragmenter.class);
}
- /**
- * Extracts the user data:
- * serde, partition keys and whether filter was included in fragmenter
- *
- * @param input input data from client
- * @param supportedSerdes supported serde names
- * @return parsed tokens
- * @throws UserDataException if user data contains unsupported serde
- * or wrong number of tokens
- */
- static public String[] parseToks(InputData input, String... supportedSerdes)
- throws UserDataException {
- String userData = new String(input.getFragmentUserData());
- String[] toks = userData.split(HIVE_UD_DELIM);
- if (supportedSerdes.length > 0
- && !Arrays.asList(supportedSerdes).contains(toks[TOK_SERDE])) {
- throw new UserDataException(toks[TOK_SERDE]
- + " serializer isn't supported by " + input.getAccessor());
- }
-
- if (toks.length != (EXPECTED_NUM_OF_TOKS)) {
- throw new UserDataException("HiveInputFormatFragmenter expected "
- + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
- }
-
- return toks;
- }
-
/*
* Checks that hive fields and partitions match the HAWQ schema. Throws an
* exception if: - the number of fields (+ partitions) do not match the HAWQ
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
index ed4f908..66680bb 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
@@ -21,12 +21,12 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.hawq.pxf.api.utilities.InputData;
-
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
/**
* Specialization of HiveAccessor for a Hive table stored as Text files.
@@ -43,9 +43,9 @@ public class HiveLineBreakAccessor extends HiveAccessor {
public HiveLineBreakAccessor(InputData input) throws Exception {
super(input, new TextInputFormat());
((TextInputFormat) inputFormat).configure(jobConf);
- String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name());
- initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
- filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE);
+ initPartitionFields(hiveUserData.getPartitionKeys());
+ filterInFragmenter = hiveUserData.isFilterInFragmenter();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
index 91f91e7..dc76289 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
@@ -21,33 +21,49 @@ package org.apache.hawq.pxf.plugins.hive;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
-
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hawq.pxf.api.Metadata;
import org.apache.hawq.pxf.api.MetadataFetcher;
+import org.apache.hawq.pxf.api.OutputFormat;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.ProfilesConf;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.ProfileFactory;
/**
* Class for connecting to Hive's MetaStore and getting schema of Hive tables.
*/
public class HiveMetadataFetcher extends MetadataFetcher {
+ private static final String DELIM_FIELD = InputData.DELIMITER_KEY;
+
private static final Log LOG = LogFactory.getLog(HiveMetadataFetcher.class);
private HiveMetaStoreClient client;
+ private JobConf jobConf;
public HiveMetadataFetcher(InputData md) {
super(md);
// init hive metastore client connection.
client = HiveUtilities.initHiveClient();
+ jobConf = new JobConf(new Configuration());
}
/**
@@ -82,8 +98,28 @@ public class HiveMetadataFetcher extends MetadataFetcher {
try {
Metadata metadata = new Metadata(tblDesc);
Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
- getSchema(tbl, metadata);
+ HiveUtilities.getSchema(tbl, metadata);
+ boolean hasComplexTypes = HiveUtilities.hasComplexTypes(metadata);
metadataList.add(metadata);
+ List<Partition> tablePartitions = client.listPartitionsByFilter(tblDesc.getPath(), tblDesc.getName(), "", (short) -1);
+ Set<OutputFormat> formats = new HashSet<OutputFormat>();
+ //If table has partitions - find out all formats
+ for (Partition tablePartition : tablePartitions) {
+ String inputFormat = tablePartition.getSd().getInputFormat();
+ OutputFormat outputFormat = getOutputFormat(inputFormat, hasComplexTypes);
+ formats.add(outputFormat);
+ }
+ //If table has no partitions - get single format of table
+ if (tablePartitions.size() == 0 ) {
+ String inputFormat = tbl.getSd().getInputFormat();
+ OutputFormat outputFormat = getOutputFormat(inputFormat, hasComplexTypes);
+ formats.add(outputFormat);
+ }
+ metadata.setOutputFormats(formats);
+ Map<String, String> outputParameters = new HashMap<String, String>();
+ Integer delimiterCode = HiveUtilities.getDelimiterCode(tbl.getSd());
+ outputParameters.put(DELIM_FIELD, delimiterCode.toString());
+ metadata.setOutputParameters(outputParameters);
} catch (UnsupportedTypeException | UnsupportedOperationException e) {
if(ignoreErrors) {
LOG.warn("Metadata fetch for " + tblDesc.toString() + " failed. " + e.getMessage());
@@ -97,42 +133,13 @@ public class HiveMetadataFetcher extends MetadataFetcher {
return metadataList;
}
-
- /**
- * Populates the given metadata object with the given table's fields and partitions,
- * The partition fields are added at the end of the table schema.
- * Throws an exception if the table contains unsupported field types.
- * Supported HCatalog types: TINYINT,
- * SMALLINT, INT, BIGINT, BOOLEAN, FLOAT, DOUBLE, STRING, BINARY, TIMESTAMP,
- * DATE, DECIMAL, VARCHAR, CHAR.
- *
- * @param tbl Hive table
- * @param metadata schema of given table
- */
- private void getSchema(Table tbl, Metadata metadata) {
-
- int hiveColumnsSize = tbl.getSd().getColsSize();
- int hivePartitionsSize = tbl.getPartitionKeysSize();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hive table: " + hiveColumnsSize + " fields, " + hivePartitionsSize + " partitions.");
- }
-
- // check hive fields
- try {
- List<FieldSchema> hiveColumns = tbl.getSd().getCols();
- for (FieldSchema hiveCol : hiveColumns) {
- metadata.addField(HiveUtilities.mapHiveType(hiveCol));
- }
- // check partition fields
- List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
- for (FieldSchema hivePart : hivePartitions) {
- metadata.addField(HiveUtilities.mapHiveType(hivePart));
- }
- } catch (UnsupportedTypeException e) {
- String errorMsg = "Failed to retrieve metadata for table " + metadata.getItem() + ". " +
- e.getMessage();
- throw new UnsupportedTypeException(errorMsg);
- }
+ private OutputFormat getOutputFormat(String inputFormat, boolean hasComplexTypes) throws Exception {
+ OutputFormat outputFormat = null;
+ InputFormat<?, ?> fformat = HiveDataFragmenter.makeInputFormat(inputFormat, jobConf);
+ String profile = ProfileFactory.get(fformat, hasComplexTypes);
+ String outputFormatClassName = ProfilesConf.getProfilePluginsMap(profile).get("X-GP-OUTPUTFORMAT");
+ outputFormat = OutputFormat.getOutputFormat(outputFormatClassName);
+ return outputFormat;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index dc195f4..07348b0 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -30,6 +30,7 @@ import org.apache.hawq.pxf.api.BasicFilter;
import org.apache.hawq.pxf.api.LogicalFilter;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.commons.lang.StringUtils;
import java.sql.Date;
@@ -37,7 +38,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
/**
* Specialization of HiveAccessor for a Hive table that stores only ORC files.
@@ -61,9 +62,9 @@ public class HiveORCAccessor extends HiveAccessor {
*/
public HiveORCAccessor(InputData input) throws Exception {
super(input, new OrcInputFormat());
- String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.ORC_SERDE.name());
- initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
- filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE);
+ initPartitionFields(hiveUserData.getPartitionKeys());
+ filterInFragmenter = hiveUserData.isFilterInFragmenter();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
index 93aa474..fec0ff0 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
@@ -34,6 +34,7 @@ import org.apache.hawq.pxf.api.io.DataType;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
import java.util.*;
@@ -43,8 +44,7 @@ import java.util.*;
*/
public class HiveORCSerdeResolver extends HiveResolver {
private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class);
- private OrcSerde deserializer;
- private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
+ private HiveUtilities.PXF_HIVE_SERDES serdeType;
private String typesString;
public HiveORCSerdeResolver(InputData input) throws Exception {
@@ -54,41 +54,16 @@ public class HiveORCSerdeResolver extends HiveResolver {
/* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
@Override
void parseUserData(InputData input) throws Exception {
- String[] toks = HiveInputFormatFragmenter.parseToks(input);
- String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
- if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE.name())) {
- serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE;
- } else {
- throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
- }
- partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
- typesString = toks[HiveInputFormatFragmenter.TOK_COL_TYPES];
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE);
+ serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName());
+ partitionKeys = hiveUserData.getPartitionKeys();
+ typesString = hiveUserData.getColTypes();
collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
: input.getUserProperty("COLLECTION_DELIM");
mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
: input.getUserProperty("MAPKEY_DELIM");
}
- /**
- * getFields returns a singleton list of OneField item.
- * OneField item contains two fields: an integer representing the VARCHAR type and a Java
- * Object representing the field value.
- */
- @Override
- public List<OneField> getFields(OneRow onerow) throws Exception {
-
- Object tuple = deserializer.deserialize((Writable) onerow.getData());
- // Each Hive record is a Struct
- StructObjectInspector soi = (StructObjectInspector) deserializer.getObjectInspector();
- List<OneField> record = traverseStruct(tuple, soi, false);
-
- //Add partition fields if any
- record.addAll(getPartitionFields());
-
- return record;
-
- }
-
/*
* Get and init the deserializer for the records of this Hive data fragment.
* Suppress Warnings added because deserializer.initialize is an abstract function that is deprecated
@@ -127,12 +102,7 @@ public class HiveORCSerdeResolver extends HiveResolver {
serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
- if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE) {
- deserializer = new OrcSerde();
- } else {
- throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
- }
-
+ deserializer = HiveUtilities.createDeserializer(serdeType, HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE);
deserializer.initialize(new JobConf(new Configuration(), HiveORCSerdeResolver.class), serdeProperties);
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
index 2686851..7132d7b 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
@@ -21,7 +21,7 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.hawq.pxf.api.utilities.InputData;
-
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
import org.apache.hadoop.mapred.FileSplit;
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
/**
* Specialization of HiveAccessor for a Hive table that stores only RC files.
@@ -47,9 +47,9 @@ public class HiveRCFileAccessor extends HiveAccessor {
*/
public HiveRCFileAccessor(InputData input) throws Exception {
super(input, new RCFileInputFormat());
- String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name());
- initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
- filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.COLUMNAR_SERDE, PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE);
+ initPartitionFields(hiveUserData.getPartitionKeys());
+ filterInFragmenter = hiveUserData.isFilterInFragmenter();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
index 3837f78..5646969 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
@@ -27,6 +27,7 @@ import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Plugin;
import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.commons.lang.CharUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -74,10 +75,10 @@ public class HiveResolver extends Plugin implements ReadResolver {
protected static final String COLLECTION_DELIM = ",";
protected String collectionDelim;
protected String mapkeyDelim;
- private SerDe deserializer;
+ protected SerDe deserializer;
private List<OneField> partitionFields;
- private String serdeName;
- private String propsString;
+ protected String serdeClassName;
+ protected String propsString;
String partitionKeys;
protected char delimiter;
String nullChar = "\\N";
@@ -133,19 +134,11 @@ public class HiveResolver extends Plugin implements ReadResolver {
/* Parses user data string (arrived from fragmenter). */
void parseUserData(InputData input) throws Exception {
- final int EXPECTED_NUM_OF_TOKS = 5;
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
- String userData = new String(input.getFragmentUserData());
- String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
-
- if (toks.length != EXPECTED_NUM_OF_TOKS) {
- throw new UserDataException("HiveResolver expected "
- + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
- }
-
- serdeName = toks[1];
- propsString = toks[2];
- partitionKeys = toks[3];
+ serdeClassName = hiveUserData.getSerdeClassName();
+ propsString = hiveUserData.getPropertiesString();
+ partitionKeys = hiveUserData.getPartitionKeys();
collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
: input.getUserProperty("COLLECTION_DELIM");
@@ -160,14 +153,16 @@ public class HiveResolver extends Plugin implements ReadResolver {
void initSerde(InputData inputData) throws Exception {
Properties serdeProperties;
- Class<?> c = Class.forName(serdeName, true, JavaUtils.getClassLoader());
+ Class<?> c = Class.forName(serdeClassName, true, JavaUtils.getClassLoader());
deserializer = (SerDe) c.newInstance();
serdeProperties = new Properties();
- ByteArrayInputStream inStream = new ByteArrayInputStream(
- propsString.getBytes());
- serdeProperties.load(inStream);
- deserializer.initialize(new JobConf(conf, HiveResolver.class),
- serdeProperties);
+ if (propsString != null ) {
+ ByteArrayInputStream inStream = new ByteArrayInputStream(propsString.getBytes());
+ serdeProperties.load(inStream);
+ } else {
+ throw new IllegalArgumentException("propsString is mandatory to initialize serde.");
+ }
+ deserializer.initialize(new JobConf(conf, HiveResolver.class), serdeProperties);
}
/*
@@ -271,7 +266,7 @@ public class HiveResolver extends Plugin implements ReadResolver {
* The partition fields are initialized one time based on userData provided
* by the fragmenter.
*/
- void initPartitionFields(StringBuilder parts) {
+ void initTextPartitionFields(StringBuilder parts) {
if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
return;
}
@@ -625,47 +620,49 @@ public class HiveResolver extends Plugin implements ReadResolver {
*/
void parseDelimiterChar(InputData input) {
- String userDelim = input.getUserProperty("DELIMITER");
+ String userDelim = input.getUserProperty(InputData.DELIMITER_KEY);
if (userDelim == null) {
- throw new IllegalArgumentException("DELIMITER is a required option");
- }
-
- final int VALID_LENGTH = 1;
- final int VALID_LENGTH_HEX = 4;
-
- if (userDelim.startsWith("\\x")) { // hexadecimal sequence
-
- if (userDelim.length() != VALID_LENGTH_HEX) {
+ /* No DELIMITER in URL, try to get it from fragment's user data*/
+ HiveUserData hiveUserData = null;
+ try {
+ hiveUserData = HiveUtilities.parseHiveUserData(input);
+ } catch (UserDataException ude) {
+ throw new IllegalArgumentException("Couldn't parse user data to get " + InputData.DELIMITER_KEY);
+ }
+ if (hiveUserData.getDelimiter() == null) {
+ throw new IllegalArgumentException(InputData.DELIMITER_KEY + " is a required option");
+ }
+ delimiter = (char) Integer.valueOf(hiveUserData.getDelimiter()).intValue();
+ } else {
+ final int VALID_LENGTH = 1;
+ final int VALID_LENGTH_HEX = 4;
+ if (userDelim.startsWith("\\x")) { // hexadecimal sequence
+ if (userDelim.length() != VALID_LENGTH_HEX) {
+ throw new IllegalArgumentException(
+ "Invalid hexdecimal value for delimiter (got"
+ + userDelim + ")");
+ }
+ delimiter = (char) Integer.parseInt(
+ userDelim.substring(2, VALID_LENGTH_HEX), 16);
+ if (!CharUtils.isAscii(delimiter)) {
+ throw new IllegalArgumentException(
+ "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
+ + delimiter + ")");
+ }
+ return;
+ }
+ if (userDelim.length() != VALID_LENGTH) {
throw new IllegalArgumentException(
- "Invalid hexdecimal value for delimiter (got"
+ "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got "
+ userDelim + ")");
}
-
- delimiter = (char) Integer.parseInt(
- userDelim.substring(2, VALID_LENGTH_HEX), 16);
-
- if (!CharUtils.isAscii(delimiter)) {
+ if (!CharUtils.isAscii(userDelim.charAt(0))) {
throw new IllegalArgumentException(
"Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
- + delimiter + ")");
+ + userDelim + ")");
}
-
- return;
- }
-
- if (userDelim.length() != VALID_LENGTH) {
- throw new IllegalArgumentException(
- "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got "
- + userDelim + ")");
+ delimiter = userDelim.charAt(0);
}
-
- if (!CharUtils.isAscii(userDelim.charAt(0))) {
- throw new IllegalArgumentException(
- "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
- + userDelim + ")");
- }
-
- delimiter = userDelim.charAt(0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
index fdc5f69..76d5cad 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
@@ -22,7 +22,11 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.hawq.pxf.api.OneField;
import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.OutputFormat;
+import org.apache.hawq.pxf.api.UserDataException;
import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
import java.util.Collections;
import java.util.List;
@@ -42,21 +46,32 @@ public class HiveStringPassResolver extends HiveResolver {
@Override
void parseUserData(InputData input) throws Exception {
- String userData = new String(input.getFragmentUserData());
- String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
parseDelimiterChar(input);
parts = new StringBuilder();
- partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+ partitionKeys = hiveUserData.getPartitionKeys();
+ serdeClassName = hiveUserData.getSerdeClassName();
+
+ /* Needed only for GPDBWritable format*/
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.GPDBWritable) {
+ propsString = hiveUserData.getPropertiesString();
+ }
}
@Override
- void initSerde(InputData input) {
- /* nothing to do here */
+ void initSerde(InputData input) throws Exception {
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.GPDBWritable) {
+ super.initSerde(input);
+ }
}
@Override
void initPartitionFields() {
- initPartitionFields(parts);
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+ initTextPartitionFields(parts);
+ } else {
+ super.initPartitionFields();
+ }
}
/**
@@ -66,9 +81,13 @@ public class HiveStringPassResolver extends HiveResolver {
*/
@Override
public List<OneField> getFields(OneRow onerow) throws Exception {
- String line = (onerow.getData()).toString();
-
- /* We follow Hive convention. Partition fields are always added at the end of the record */
- return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+ String line = (onerow.getData()).toString();
+ /* We follow Hive convention. Partition fields are always added at the end of the record */
+ return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
+ } else {
+ return super.getFields(onerow);
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
new file mode 100644
index 0000000..e3632e0
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.plugins.hive;
+
+/**
+ * Class which is a carrier for user data in Hive fragment.
+ *
+ */
+public class HiveUserData {
+
+ public static final String HIVE_UD_DELIM = "!HUDD!";
+ private static final int EXPECTED_NUM_OF_TOKS = 7;
+
+ public HiveUserData(String inputFormatName, String serdeClassName,
+ String propertiesString, String partitionKeys,
+ boolean filterInFragmenter,
+ String delimiter,
+ String colTypes) {
+
+ this.inputFormatName = inputFormatName;
+ this.serdeClassName = serdeClassName;
+ this.propertiesString = propertiesString;
+ this.partitionKeys = partitionKeys;
+ this.filterInFragmenter = filterInFragmenter;
+ this.delimiter = (delimiter == null ? "0" : delimiter);
+ this.colTypes = colTypes;
+ }
+
+ /**
+ * Returns input format of a fragment
+ *
+ * @return input format of a fragment
+ */
+ public String getInputFormatName() {
+ return inputFormatName;
+ }
+
+ /**
+ * Returns SerDe class name
+ *
+ * @return SerDe class name
+ */
+ public String getSerdeClassName() {
+ return serdeClassName;
+ }
+
+ /**
+ * Returns properties string needed for SerDe initialization
+ *
+ * @return properties string needed for SerDe initialization
+ */
+ public String getPropertiesString() {
+ return propertiesString;
+ }
+
+ /**
+ * Returns partition keys
+ *
+ * @return partition keys
+ */
+ public String getPartitionKeys() {
+ return partitionKeys;
+ }
+
+ /**
+ * Returns whether filtering was done in fragmenter
+ *
+ * @return true if filtering was done in fragmenter
+ */
+ public boolean isFilterInFragmenter() {
+ return filterInFragmenter;
+ }
+
+ /**
+ * Returns field delimiter
+ *
+ * @return field delimiter
+ */
+ public String getDelimiter() {
+ return delimiter;
+ }
+
+ public void setDelimiter(String delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ private String inputFormatName;
+ private String serdeClassName;
+ private String propertiesString;
+ private String partitionKeys;
+ private boolean filterInFragmenter;
+ private String delimiter;
+ private String colTypes;
+
+ /**
+ * The method returns expected number of tokens in raw user data
+ *
+ * @return number of tokens in raw user data
+ */
+ public static int getNumOfTokens() {
+ return EXPECTED_NUM_OF_TOKS;
+ }
+
+ @Override
+ public String toString() {
+ return inputFormatName + HiveUserData.HIVE_UD_DELIM
+ + serdeClassName + HiveUserData.HIVE_UD_DELIM
+ + propertiesString + HiveUserData.HIVE_UD_DELIM
+ + partitionKeys + HiveUserData.HIVE_UD_DELIM
+ + filterInFragmenter + HiveUserData.HIVE_UD_DELIM
+ + delimiter + HiveUserData.HIVE_UD_DELIM
+ + colTypes;
+ }
+
+ public String getColTypes() {
+ return colTypes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
index d91e949..ea65a66 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
@@ -42,37 +42,48 @@ public enum EnumHiveToHawqType {
FloatType("float", EnumHawqType.Float4Type),
DoubleType("double", EnumHawqType.Float8Type),
StringType("string", EnumHawqType.TextType),
- BinaryType("binary", EnumHawqType.ByteaType),
+ BinaryType("binary", EnumHawqType.ByteaType, true),
TimestampType("timestamp", EnumHawqType.TimestampType),
DateType("date", EnumHawqType.DateType),
DecimalType("decimal", EnumHawqType.NumericType, "[(,)]"),
VarcharType("varchar", EnumHawqType.VarcharType, "[(,)]"),
CharType("char", EnumHawqType.BpcharType, "[(,)]"),
- ArrayType("array", EnumHawqType.TextType, "[<,>]"),
- MapType("map", EnumHawqType.TextType, "[<,>]"),
- StructType("struct", EnumHawqType.TextType, "[<,>]"),
- UnionType("uniontype", EnumHawqType.TextType, "[<,>]");
+ ArrayType("array", EnumHawqType.TextType, "[<,>]", true),
+ MapType("map", EnumHawqType.TextType, "[<,>]", true),
+ StructType("struct", EnumHawqType.TextType, "[<,>]", true),
+ UnionType("uniontype", EnumHawqType.TextType, "[<,>]", true);
private String typeName;
private EnumHawqType hawqType;
private String splitExpression;
private byte size;
+ private boolean isComplexType;
EnumHiveToHawqType(String typeName, EnumHawqType hawqType) {
this.typeName = typeName;
this.hawqType = hawqType;
}
-
+
EnumHiveToHawqType(String typeName, EnumHawqType hawqType, byte size) {
this(typeName, hawqType);
this.size = size;
}
+ EnumHiveToHawqType(String typeName, EnumHawqType hawqType, boolean isComplexType) {
+ this(typeName, hawqType);
+ this.isComplexType = isComplexType;
+ }
+
EnumHiveToHawqType(String typeName, EnumHawqType hawqType, String splitExpression) {
this(typeName, hawqType);
this.splitExpression = splitExpression;
}
+ EnumHiveToHawqType(String typeName, EnumHawqType hawqType, String splitExpression, boolean isComplexType) {
+ this(typeName, hawqType, splitExpression);
+ this.isComplexType = isComplexType;
+ }
+
/**
*
* @return name of type
@@ -216,4 +227,12 @@ public enum EnumHiveToHawqType {
return size;
}
+ public boolean isComplexType() {
+ return isComplexType;
+ }
+
+ public void setComplexType(boolean isComplexType) {
+ this.isComplexType = isComplexType;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index f7ebf4d..37f4ac2 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -35,17 +35,28 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.*;
import org.apache.hawq.pxf.api.Fragmenter;
import org.apache.hawq.pxf.api.Metadata;
+import org.apache.hawq.pxf.api.Metadata.Field;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.UserDataException;
import org.apache.hawq.pxf.api.utilities.EnumHawqType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hawq.pxf.plugins.hive.HiveDataFragmenter;
import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter;
import org.apache.hawq.pxf.plugins.hive.HiveTablePartition;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS;
-import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import org.apache.hawq.pxf.plugins.hive.HiveUserData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
/**
* Class containing helper functions connecting
@@ -53,6 +64,46 @@ import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDE
*/
public class HiveUtilities {
+ /** Defines the Hive serializers (serde classes) currently supported in pxf */
+ public enum PXF_HIVE_SERDES {
+ COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"),
+ LAZY_BINARY_COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"),
+ LAZY_SIMPLE_SERDE("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
+ ORC_SERDE("org.apache.hadoop.hive.ql.io.orc.OrcSerde");
+
+ private String serdeClassName;
+
+ PXF_HIVE_SERDES(String serdeClassName) {
+ this.serdeClassName = serdeClassName;
+ }
+
+ /**
+ * Method which looks up serde by serde class name.
+ *
+ * @param serdeClassName input serde name
+ * @param allowedSerdes all serdes which allowed in current context
+ * @return serde by given serde class name and list of allowed serdes
+ * @throws UnsupportedTypeException if unable to find serde by class name, or found serde which is not allowed in current context
+ */
+ public static PXF_HIVE_SERDES getPxfHiveSerde(String serdeClassName, PXF_HIVE_SERDES... allowedSerdes) {
+ for (PXF_HIVE_SERDES s : values()) {
+ if (s.getSerdeClassName().equals(serdeClassName)) {
+
+ if (allowedSerdes.length > 0
+ && !Arrays.asList(allowedSerdes).contains(s)) {
+ throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeClassName);
+ }
+ return s;
+ }
+ }
+ throw new UnsupportedTypeException("Unable to find serde for class name: "+ serdeClassName);
+ }
+
+ public String getSerdeClassName() {
+ return serdeClassName;
+ }
+ }
+
private static final Log LOG = LogFactory.getLog(HiveUtilities.class);
private static final String WILDCARD = "*";
@@ -64,10 +115,7 @@ public class HiveUtilities {
static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
- static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
- static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe";
- static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
- static final String STR_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
+ private static final int DEFAULT_DELIMITER_CODE = 44;
/**
* Initializes the HiveMetaStoreClient
@@ -162,7 +210,7 @@ public class HiveUtilities {
} else
hiveTypeName = hiveType;
- return new Metadata.Field(fieldName, hawqType, hiveTypeName, modifiers);
+ return new Metadata.Field(fieldName, hawqType, hiveToHawqType.isComplexType(), hiveTypeName, modifiers);
}
/**
@@ -376,31 +424,6 @@ public class HiveUtilities {
}
}
- /*
- * Validates that partition serde corresponds to PXF supported serdes and
- * transforms the class name to an enumeration for writing it to the
- * resolvers on other PXF instances.
- */
- private static String assertSerde(String className, HiveTablePartition partData)
- throws Exception {
- switch (className) {
- case STR_COLUMNAR_SERDE:
- return PXF_HIVE_SERDES.COLUMNAR_SERDE.name();
- case STR_LAZY_BINARY_COLUMNAR_SERDE:
- return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name();
- case STR_LAZY_SIMPLE_SERDE:
- return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name();
- case STR_ORC_SERDE:
- return PXF_HIVE_SERDES.ORC_SERDE.name();
- default:
- throw new UnsupportedTypeException(
- "HiveInputFormatFragmenter does not yet support "
- + className + " for " + partData
- + ". Supported serializers are: "
- + Arrays.toString(PXF_HIVE_SERDES.values()));
- }
- }
-
/* Turns the partition keys into a string */
public static String serializePartitionKeys(HiveTablePartition partData) throws Exception {
@@ -429,10 +452,19 @@ public class HiveUtilities {
return partitionKeys.toString();
}
+ /**
+ * The method which serializes fragment-related attributes, needed for reading and resolution to string
+ *
+ * @param fragmenterClassName
+ * @param partData
+ * @param filterInFragmenter
+ * @return serialized representation of fragment-related attributes
+ * @throws Exception
+ */
@SuppressWarnings("unchecked")
public static byte[] makeUserData(String fragmenterClassName, HiveTablePartition partData, boolean filterInFragmenter) throws Exception {
- String userData = null;
+ HiveUserData hiveUserData = null;
if (fragmenterClassName == null) {
throw new IllegalArgumentException("No fragmenter provided.");
@@ -440,25 +472,158 @@ public class HiveUtilities {
Class fragmenterClass = Class.forName(fragmenterClassName);
+ String inputFormatName = partData.storageDesc.getInputFormat();
+ String serdeClassName = partData.storageDesc.getSerdeInfo().getSerializationLib();
+ String propertiesString = serializeProperties(partData.properties);
+ String partitionKeys = serializePartitionKeys(partData);
+ String delimiter = getDelimiterCode(partData.storageDesc).toString();
+ String colTypes = partData.properties.getProperty("columns.types");
+
if (HiveInputFormatFragmenter.class.isAssignableFrom(fragmenterClass)) {
- String inputFormatName = partData.storageDesc.getInputFormat();
- String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
- String partitionKeys = serializePartitionKeys(partData);
- String colTypes = partData.properties.getProperty("columns.types");
assertFileType(inputFormatName, partData);
- userData = assertSerde(serdeName, partData) + HiveDataFragmenter.HIVE_UD_DELIM
- + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter + HiveDataFragmenter.HIVE_UD_DELIM + colTypes;
- } else if (HiveDataFragmenter.class.isAssignableFrom(fragmenterClass)){
- String inputFormatName = partData.storageDesc.getInputFormat();
- String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
- String propertiesString = serializeProperties(partData.properties);
- String partitionKeys = serializePartitionKeys(partData);
- userData = inputFormatName + HiveDataFragmenter.HIVE_UD_DELIM + serdeName
- + HiveDataFragmenter.HIVE_UD_DELIM + propertiesString + HiveDataFragmenter.HIVE_UD_DELIM
- + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter;
- } else {
- throw new IllegalArgumentException("HiveUtilities#makeUserData is not implemented for " + fragmenterClassName);
}
- return userData.getBytes();
+
+ hiveUserData = new HiveUserData(inputFormatName, serdeClassName, propertiesString, partitionKeys, filterInFragmenter, delimiter, colTypes);
+
+ return hiveUserData.toString().getBytes();
+ }
+
+ /**
+ * The method parses raw user data into HiveUserData class
+ *
+ * @param input input data
+ * @param supportedSerdes list of allowed serdes in current context
+ * @return instance of HiveUserData class
+ * @throws UserDataException
+ */
+ public static HiveUserData parseHiveUserData(InputData input, PXF_HIVE_SERDES... supportedSerdes) throws UserDataException{
+ String userData = new String(input.getFragmentUserData());
+ String[] toks = userData.split(HiveUserData.HIVE_UD_DELIM, HiveUserData.getNumOfTokens());
+
+ if (toks.length != (HiveUserData.getNumOfTokens())) {
+ throw new UserDataException("HiveInputFormatFragmenter expected "
+ + HiveUserData.getNumOfTokens() + " tokens, but got " + toks.length);
+ }
+
+ HiveUserData hiveUserData = new HiveUserData(toks[0], toks[1], toks[2], toks[3], Boolean.valueOf(toks[4]), toks[5], toks[6]);
+
+ if (supportedSerdes.length > 0) {
+ /* Make sure this serde is supported */
+ PXF_HIVE_SERDES pxfHiveSerde = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName(), supportedSerdes);
+ }
+
+ return hiveUserData;
+ }
+
+ private static String getSerdeParameter(StorageDescriptor sd, String parameterKey) {
+ String parameterValue = null;
+ if (sd != null && sd.getSerdeInfo() != null && sd.getSerdeInfo().getParameters() != null && sd.getSerdeInfo().getParameters().get(parameterKey) != null) {
+ parameterValue = sd.getSerdeInfo().getParameters().get(parameterKey);
+ }
+
+ return parameterValue;
+ }
+
+ /**
+ * The method which extracts field delimiter from storage descriptor.
+ * When unable to extract delimiter from storage descriptor, default value is used
+ *
+ * @param sd StorageDescriptor of table/partition
+ * @return ASCII code of delimiter
+ */
+ public static Integer getDelimiterCode(StorageDescriptor sd) {
+ Integer delimiterCode = null;
+
+ String delimiter = getSerdeParameter(sd, serdeConstants.FIELD_DELIM);
+ if (delimiter != null) {
+ delimiterCode = (int) delimiter.charAt(0);
+ return delimiterCode;
+ }
+
+ delimiter = getSerdeParameter(sd, serdeConstants.SERIALIZATION_FORMAT);
+ if (delimiter != null) {
+ delimiterCode = Integer.parseInt(delimiter);
+ return delimiterCode;
+ }
+
+ return DEFAULT_DELIMITER_CODE;
+ }
+
+ /**
+ * The method determines whether metadata definition has any complex type
+ * @see EnumHiveToHawqType for complex type attribute definition
+ *
+ * @param metadata metadata of relation
+ * @return true if metadata has at least one field of complex type
+ */
+ public static boolean hasComplexTypes(Metadata metadata) {
+ boolean hasComplexTypes = false;
+ List<Field> fields = metadata.getFields();
+ for (Field field: fields) {
+ if (field.isComplexType()) {
+ hasComplexTypes = true;
+ break;
+ }
+ }
+
+ return hasComplexTypes;
+ }
+
+ /**
+ * Populates the given metadata object with the given table's fields and partitions,
+ * The partition fields are added at the end of the table schema.
+ * Throws an exception if the table contains unsupported field types.
+ * Supported HCatalog types: TINYINT,
+ * SMALLINT, INT, BIGINT, BOOLEAN, FLOAT, DOUBLE, STRING, BINARY, TIMESTAMP,
+ * DATE, DECIMAL, VARCHAR, CHAR.
+ *
+ * @param tbl Hive table
+ * @param metadata schema of given table
+ */
+ public static void getSchema(Table tbl, Metadata metadata) {
+
+ int hiveColumnsSize = tbl.getSd().getColsSize();
+ int hivePartitionsSize = tbl.getPartitionKeysSize();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hive table: " + hiveColumnsSize + " fields, " + hivePartitionsSize + " partitions.");
+ }
+
+ // check hive fields
+ try {
+ List<FieldSchema> hiveColumns = tbl.getSd().getCols();
+ for (FieldSchema hiveCol : hiveColumns) {
+ metadata.addField(HiveUtilities.mapHiveType(hiveCol));
+ }
+ // check partition fields
+ List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
+ for (FieldSchema hivePart : hivePartitions) {
+ metadata.addField(HiveUtilities.mapHiveType(hivePart));
+ }
+ } catch (UnsupportedTypeException e) {
+ String errorMsg = "Failed to retrieve metadata for table " + metadata.getItem() + ". " +
+ e.getMessage();
+ throw new UnsupportedTypeException(errorMsg);
+ }
+ }
+
+ /**
+ * Creates an instance of a given serde type
+ *
+ * @param serdeType
+ * @param allowedSerdes
+ * @return instance of a given serde
+ * @throws UnsupportedTypeException if given serde is not allowed in current context
+ */
+ @SuppressWarnings("deprecation")
+ public static SerDe createDeserializer(PXF_HIVE_SERDES serdeType, PXF_HIVE_SERDES... allowedSerdes) throws Exception{
+ SerDe deserializer = null;
+ if (!Arrays.asList(allowedSerdes).contains(serdeType)) {
+ throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name());
+ }
+
+ deserializer = (SerDe) Utilities.createAnyInstance(serdeType.getSerdeClassName());
+
+ return deserializer;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
new file mode 100644
index 0000000..f36f074
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
@@ -0,0 +1,61 @@
+package org.apache.hawq.pxf.plugins.hive.utilities;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hawq.pxf.api.Metadata;
+
+/**
+ * Factory class which returns optimal profile for given input format
+ *
+ */
+public class ProfileFactory {
+
+ private static final String HIVE_TEXT_PROFILE = "HiveText";
+ private static final String HIVE_RC_PROFILE = "HiveRC";
+ private static final String HIVE_ORC_PROFILE = "HiveORC";
+ private static final String HIVE_PROFILE = "Hive";
+
+ /**
+ * The method which returns optimal profile
+ *
+ * @param inputFormat input format of table/partition
+ * @param hasComplexTypes whether record has complex types, see @EnumHiveToHawqType
+ * @return name of optimal profile
+ */
+ public static String get(InputFormat inputFormat, boolean hasComplexTypes) {
+ String profileName = null;
+ if (inputFormat instanceof TextInputFormat && !hasComplexTypes) {
+ profileName = HIVE_TEXT_PROFILE;
+ } else if (inputFormat instanceof RCFileInputFormat) {
+ profileName = HIVE_RC_PROFILE;
+ } else if (inputFormat instanceof OrcInputFormat) {
+ profileName = HIVE_ORC_PROFILE;
+ } else {
+ //Default case
+ profileName = HIVE_PROFILE;
+ }
+ return profileName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java
index d9d97fc..6e40f9a 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java
@@ -132,6 +132,7 @@ public class HiveMetadataFetcherTest {
fields.add(new FieldSchema("field2", "int", null));
StorageDescriptor sd = new StorageDescriptor();
sd.setCols(fields);
+ sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
Table hiveTable = new Table();
hiveTable.setTableType("MANAGED_TABLE");
hiveTable.setSd(sd);
@@ -176,6 +177,7 @@ public class HiveMetadataFetcherTest {
fields.add(new FieldSchema("field2", "int", null));
StorageDescriptor sd = new StorageDescriptor();
sd.setCols(fields);
+ sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
// Mock hive tables returned from hive client
for(int index=1;index<=2;index++) {
@@ -235,6 +237,7 @@ public class HiveMetadataFetcherTest {
fields.add(new FieldSchema("field2", "int", null));
StorageDescriptor sd = new StorageDescriptor();
sd.setCols(fields);
+ sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
Table hiveTable2 = new Table();
hiveTable2.setTableType("MANAGED_TABLE");
hiveTable2.setSd(sd);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
index 7bbe811..8b4bf13 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.mapred.*;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -43,7 +45,7 @@ import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({HiveORCAccessor.class, HiveInputFormatFragmenter.class, HdfsUtilities.class, HiveDataFragmenter.class})
+@PrepareForTest({HiveORCAccessor.class, HiveUtilities.class, HdfsUtilities.class, HiveDataFragmenter.class})
@SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf",
"org.apache.hadoop.hive.metastore.api.MetaException",
"org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities"}) // Prevents static inits
@@ -61,8 +63,9 @@ public class HiveORCAccessorTest {
jobConf = new JobConf();
PowerMockito.whenNew(JobConf.class).withAnyArguments().thenReturn(jobConf);
- PowerMockito.mockStatic(HiveInputFormatFragmenter.class);
- PowerMockito.when(HiveInputFormatFragmenter.parseToks(any(InputData.class), any(String[].class))).thenReturn(new String[]{"", HiveDataFragmenter.HIVE_NO_PART_TBL, "true"});
+ PowerMockito.mockStatic(HiveUtilities.class);
+ PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true, "1", ""));
+
PowerMockito.mockStatic(HdfsUtilities.class);
PowerMockito.mockStatic(HiveDataFragmenter.class);