You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by od...@apache.org on 2017/01/24 02:32:39 UTC
[1/4] incubator-hawq git commit: HAWQ-1228. Initial commit.
Repository: incubator-hawq
Updated Branches:
refs/heads/HAWQ-1228 [created] 607184c36
HAWQ-1228. Initial commit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/0574e75f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/0574e75f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/0574e75f
Branch: refs/heads/HAWQ-1228
Commit: 0574e75fa972f6ccddd1f55a98972223b3860759
Parents: 25c87ec
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Wed Dec 28 14:03:50 2016 -0800
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Fri Jan 6 11:55:19 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/hawq/pxf/api/Metadata.java | 12 ++
.../org/apache/hawq/pxf/api/OutputFormat.java | 2 +-
.../hawq/pxf/plugins/hive/HiveAccessor.java | 14 +--
.../plugins/hive/HiveColumnarSerdeResolver.java | 62 +++++-----
.../pxf/plugins/hive/HiveDataFragmenter.java | 1 -
.../plugins/hive/HiveInputFormatFragmenter.java | 40 -------
.../pxf/plugins/hive/HiveLineBreakAccessor.java | 10 +-
.../pxf/plugins/hive/HiveMetadataFetcher.java | 41 ++++++-
.../hawq/pxf/plugins/hive/HiveORCAccessor.java | 11 +-
.../pxf/plugins/hive/HiveORCSerdeResolver.java | 17 ++-
.../pxf/plugins/hive/HiveRCFileAccessor.java | 10 +-
.../hawq/pxf/plugins/hive/HiveResolver.java | 38 +++----
.../plugins/hive/HiveStringPassResolver.java | 42 +++++--
.../hawq/pxf/plugins/hive/HiveUserData.java | 71 ++++++++++++
.../plugins/hive/utilities/HiveUtilities.java | 113 +++++++++++--------
.../pxf/plugins/hive/HiveORCAccessorTest.java | 9 +-
.../apache/hawq/pxf/service/ProfileFactory.java | 20 ++--
.../src/main/resources/pxf-profiles-default.xml | 4 +
src/backend/access/external/fileam.c | 4 +
src/backend/catalog/external/externalmd.c | 57 ++++++++--
src/bin/gpfusion/gpbridgeapi.c | 43 ++++++-
src/include/access/extprotocol.h | 1 +
src/include/access/fileam.h | 1 +
src/include/catalog/external/itemmd.h | 3 +
src/include/catalog/pg_exttable.h | 10 +-
25 files changed, 428 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
index 9e1c137..7e3b92e 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
@@ -22,6 +22,7 @@ package org.apache.hawq.pxf.api;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.hawq.pxf.api.utilities.EnumHawqType;
import org.apache.commons.lang.StringUtils;
@@ -124,6 +125,17 @@ public class Metadata {
*/
private List<Metadata.Field> fields;
+
+ private Set<OutputFormat> outputFormats;
+
+ public Set<OutputFormat> getOutputFormats() {
+ return outputFormats;
+ }
+
+ public void setFormats(Set<OutputFormat> outputFormats) {
+ this.outputFormats = outputFormats;
+ }
+
/**
* Constructs an item's Metadata.
*
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
index 230f9ff..82a747f 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
@@ -23,4 +23,4 @@ package org.apache.hawq.pxf.api;
/**
* PXF supported output formats: {@link #TEXT} and {@link #BINARY}
*/
-public enum OutputFormat {TEXT, BINARY}
+public enum OutputFormat {TEXT, BINARY, UNKNOWN}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
index ef9f76e..ea3accb 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
@@ -28,6 +28,7 @@ import org.apache.hawq.pxf.api.UnsupportedTypeException;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.InputFormat;
@@ -42,10 +43,6 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
-import static org.apache.hawq.pxf.api.io.DataType.*;
-import static org.apache.hawq.pxf.api.io.DataType.BPCHAR;
-import static org.apache.hawq.pxf.api.io.DataType.BYTEA;
-
/**
* Accessor for Hive tables. The accessor will open and read a split belonging
* to a Hive table. Opening a split means creating the corresponding InputFormat
@@ -138,12 +135,11 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
*/
private InputFormat<?, ?> createInputFormat(InputData input)
throws Exception {
- String userData = new String(input.getFragmentUserData());
- String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
- initPartitionFields(toks[3]);
- filterInFragmenter = new Boolean(toks[4]);
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
+ initPartitionFields(hiveUserData.getPartitionKeys());
+ filterInFragmenter = hiveUserData.isFilterInFragmenter();
return HiveDataFragmenter.makeInputFormat(
- toks[0]/* inputFormat name */, jobConf);
+ hiveUserData.getInputFormatName()/* inputFormat name */, jobConf);
}
/*
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
index 362ac0d..2bf39ff 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -22,12 +22,15 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.hawq.pxf.api.BadRecordException;
import org.apache.hawq.pxf.api.OneField;
import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.OutputFormat;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
import org.apache.hawq.pxf.api.io.DataType;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,7 +43,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
-
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
@@ -57,11 +59,11 @@ import static org.apache.hawq.pxf.api.io.DataType.VARCHAR;
*/
public class HiveColumnarSerdeResolver extends HiveResolver {
private static final Log LOG = LogFactory.getLog(HiveColumnarSerdeResolver.class);
- private ColumnarSerDeBase deserializer;
+ //private ColumnarSerDeBase deserializer;
private boolean firstColumn;
private StringBuilder builder;
private StringBuilder parts;
- private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
+ private HiveUtilities.PXF_HIVE_SERDES serdeType;
public HiveColumnarSerdeResolver(InputData input) throws Exception {
super(input);
@@ -70,24 +72,22 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
/* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
@Override
void parseUserData(InputData input) throws Exception {
- String[] toks = HiveInputFormatFragmenter.parseToks(input);
- String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
- if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE.name())) {
- serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE;
- } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name())) {
- serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE;
- }
- else {
- throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
- }
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE, HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE);
+ String serdeClassName = hiveUserData.getSerdeClassName();
+
+ serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(serdeClassName);
parts = new StringBuilder();
- partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+ partitionKeys = hiveUserData.getPartitionKeys();
parseDelimiterChar(input);
}
@Override
void initPartitionFields() {
- initPartitionFields(parts);
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+ initTextPartitionFields(parts);
+ } else {
+ super.initPartitionFields();
+ }
}
/**
@@ -97,15 +97,19 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
*/
@Override
public List<OneField> getFields(OneRow onerow) throws Exception {
- firstColumn = true;
- builder = new StringBuilder();
- Object tuple = deserializer.deserialize((Writable) onerow.getData());
- ObjectInspector oi = deserializer.getObjectInspector();
-
- traverseTuple(tuple, oi);
- /* We follow Hive convention. Partition fields are always added at the end of the record */
- builder.append(parts);
- return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString()));
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+ firstColumn = true;
+ builder = new StringBuilder();
+ Object tuple = deserializer.deserialize((Writable) onerow.getData());
+ ObjectInspector oi = deserializer.getObjectInspector();
+
+ traverseTuple(tuple, oi);
+ /* We follow Hive convention. Partition fields are always added at the end of the record */
+ builder.append(parts);
+ return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString()));
+ } else {
+ return super.getFields(onerow);
+ }
}
/*
@@ -138,9 +142,10 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
- if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE) {
+ //TODO: Move this logic to utilities
+ if (serdeType == HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE) {
deserializer = new ColumnarSerDe();
- } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) {
+ } else if (serdeType == HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) {
deserializer = new LazyBinaryColumnarSerDe();
} else {
throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
@@ -233,4 +238,9 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
}
firstColumn = false;
}
+
+ @Override
+ void parseDelimiterChar(InputData input) {
+ delimiter = 1;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
index 2d2b53e..97f278d 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
@@ -78,7 +78,6 @@ public class HiveDataFragmenter extends Fragmenter {
private static final Log LOG = LogFactory.getLog(HiveDataFragmenter.class);
private static final short ALL_PARTS = -1;
- public static final String HIVE_UD_DELIM = "!HUDD!";
public static final String HIVE_1_PART_DELIM = "!H1PD!";
public static final String HIVE_PARTITIONS_DELIM = "!HPAD!";
public static final String HIVE_NO_PART_TBL = "!HNPT!";
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
index 051a246..4449d8f 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
@@ -55,10 +55,6 @@ import java.util.List;
*/
public class HiveInputFormatFragmenter extends HiveDataFragmenter {
private static final Log LOG = LogFactory.getLog(HiveInputFormatFragmenter.class);
- private static final int EXPECTED_NUM_OF_TOKS = 3;
- public static final int TOK_SERDE = 0;
- public static final int TOK_KEYS = 1;
- public static final int TOK_FILTER_DONE = 2;
/** Defines the Hive input formats currently supported in pxf */
public enum PXF_HIVE_INPUT_FORMATS {
@@ -67,14 +63,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
ORC_FILE_INPUT_FORMAT
}
- /** Defines the Hive serializers (serde classes) currently supported in pxf */
- public enum PXF_HIVE_SERDES {
- COLUMNAR_SERDE,
- LAZY_BINARY_COLUMNAR_SERDE,
- LAZY_SIMPLE_SERDE,
- ORC_SERDE
- }
-
/**
* Constructs a HiveInputFormatFragmenter.
*
@@ -84,34 +72,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
super(inputData, HiveInputFormatFragmenter.class);
}
- /**
- * Extracts the user data:
- * serde, partition keys and whether filter was included in fragmenter
- *
- * @param input input data from client
- * @param supportedSerdes supported serde names
- * @return parsed tokens
- * @throws UserDataException if user data contains unsupported serde
- * or wrong number of tokens
- */
- static public String[] parseToks(InputData input, String... supportedSerdes)
- throws UserDataException {
- String userData = new String(input.getFragmentUserData());
- String[] toks = userData.split(HIVE_UD_DELIM);
- if (supportedSerdes.length > 0
- && !Arrays.asList(supportedSerdes).contains(toks[TOK_SERDE])) {
- throw new UserDataException(toks[TOK_SERDE]
- + " serializer isn't supported by " + input.getAccessor());
- }
-
- if (toks.length != (EXPECTED_NUM_OF_TOKS)) {
- throw new UserDataException("HiveInputFormatFragmenter expected "
- + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
- }
-
- return toks;
- }
-
/*
* Checks that hive fields and partitions match the HAWQ schema. Throws an
* exception if: - the number of fields (+ partitions) do not match the HAWQ
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
index ed4f908..66680bb 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
@@ -21,12 +21,12 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.hawq.pxf.api.utilities.InputData;
-
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
/**
* Specialization of HiveAccessor for a Hive table stored as Text files.
@@ -43,9 +43,9 @@ public class HiveLineBreakAccessor extends HiveAccessor {
public HiveLineBreakAccessor(InputData input) throws Exception {
super(input, new TextInputFormat());
((TextInputFormat) inputFormat).configure(jobConf);
- String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name());
- initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
- filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE);
+ initPartitionFields(hiveUserData.getPartitionKeys());
+ filterInFragmenter = hiveUserData.isFilterInFragmenter();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
index 91f91e7..3d04bc1 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
@@ -21,19 +21,27 @@ package org.apache.hawq.pxf.plugins.hive;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
-
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hawq.pxf.api.Metadata;
import org.apache.hawq.pxf.api.MetadataFetcher;
+import org.apache.hawq.pxf.api.OutputFormat;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.ProfilesConf;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.service.ProfileFactory;
/**
* Class for connecting to Hive's MetaStore and getting schema of Hive tables.
@@ -42,12 +50,14 @@ public class HiveMetadataFetcher extends MetadataFetcher {
private static final Log LOG = LogFactory.getLog(HiveMetadataFetcher.class);
private HiveMetaStoreClient client;
+ private JobConf jobConf;
public HiveMetadataFetcher(InputData md) {
super(md);
// init hive metastore client connection.
client = HiveUtilities.initHiveClient();
+ jobConf = new JobConf(new Configuration());
}
/**
@@ -84,6 +94,21 @@ public class HiveMetadataFetcher extends MetadataFetcher {
Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
getSchema(tbl, metadata);
metadataList.add(metadata);
+ List<Partition> tablePartitions = client.listPartitionsByFilter(tblDesc.getPath(), tblDesc.getName(), "", (short) -1);
+ Set<OutputFormat> formats = new HashSet<OutputFormat>();
+ //If table has partitions - find out all formats
+ for (Partition tablePartition : tablePartitions) {
+ String inputFormat = tablePartition.getSd().getInputFormat();
+ OutputFormat outputFormat = getOutputFormat(inputFormat);
+ formats.add(outputFormat);
+ }
+ //If table has no partitions - get single format of table
+ if (tablePartitions.size() == 0 ) {
+ String inputFormat = tbl.getSd().getInputFormat();
+ OutputFormat outputFormat = getOutputFormat(inputFormat);
+ formats.add(outputFormat);
+ }
+ metadata.setFormats(formats);
} catch (UnsupportedTypeException | UnsupportedOperationException e) {
if(ignoreErrors) {
LOG.warn("Metadata fetch for " + tblDesc.toString() + " failed. " + e.getMessage());
@@ -135,4 +160,18 @@ public class HiveMetadataFetcher extends MetadataFetcher {
throw new UnsupportedTypeException(errorMsg);
}
}
+
+ private OutputFormat getOutputFormat(String inputFormat) {
+ OutputFormat outputFormat = OutputFormat.UNKNOWN;
+ try {
+ InputFormat<?, ?> fformat = HiveDataFragmenter.makeInputFormat(inputFormat, jobConf);
+ String profile = ProfileFactory.get(fformat);
+ String outputFormatString = ProfilesConf.getProfilePluginsMap(profile).get("X-GP-OUTPUTFORMAT");
+ outputFormat = OutputFormat.valueOf(outputFormatString);
+ } catch (Exception e) {
+ LOG.warn("Unable to get output format for input format: " + inputFormat);
+ }
+ return outputFormat;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index dc195f4..be29eec 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -30,6 +30,9 @@ import org.apache.hawq.pxf.api.BasicFilter;
import org.apache.hawq.pxf.api.LogicalFilter;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+//import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+//import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
import org.apache.commons.lang.StringUtils;
import java.sql.Date;
@@ -37,7 +40,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
/**
* Specialization of HiveAccessor for a Hive table that stores only ORC files.
@@ -61,9 +64,9 @@ public class HiveORCAccessor extends HiveAccessor {
*/
public HiveORCAccessor(InputData input) throws Exception {
super(input, new OrcInputFormat());
- String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.ORC_SERDE.name());
- initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
- filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE);
+ initPartitionFields(hiveUserData.getPartitionKeys());
+ filterInFragmenter = hiveUserData.isFilterInFragmenter();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
index 7673713..381c407 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
@@ -34,6 +34,7 @@ import org.apache.hawq.pxf.api.io.DataType;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
import java.util.*;
@@ -44,7 +45,7 @@ import java.util.*;
public class HiveORCSerdeResolver extends HiveResolver {
private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class);
private OrcSerde deserializer;
- private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
+ private HiveUtilities.PXF_HIVE_SERDES serdeType;
public HiveORCSerdeResolver(InputData input) throws Exception {
super(input);
@@ -53,14 +54,9 @@ public class HiveORCSerdeResolver extends HiveResolver {
/* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
@Override
void parseUserData(InputData input) throws Exception {
- String[] toks = HiveInputFormatFragmenter.parseToks(input);
- String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
- if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE.name())) {
- serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE;
- } else {
- throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
- }
- partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE);
+ serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName());
+ partitionKeys = hiveUserData.getPartitionKeys();
collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
: input.getUserProperty("COLLECTION_DELIM");
mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
@@ -72,6 +68,7 @@ public class HiveORCSerdeResolver extends HiveResolver {
* OneField item contains two fields: an integer representing the VARCHAR type and a Java
* Object representing the field value.
*/
+ //TODO: It's the same as in parent class
@Override
public List<OneField> getFields(OneRow onerow) throws Exception {
@@ -117,7 +114,7 @@ public class HiveORCSerdeResolver extends HiveResolver {
serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
- if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE) {
+ if (serdeType == HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE) {
deserializer = new OrcSerde();
} else {
throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
index 2686851..7132d7b 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
@@ -21,7 +21,7 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.hawq.pxf.api.utilities.InputData;
-
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
import org.apache.hadoop.mapred.FileSplit;
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
/**
* Specialization of HiveAccessor for a Hive table that stores only RC files.
@@ -47,9 +47,9 @@ public class HiveRCFileAccessor extends HiveAccessor {
*/
public HiveRCFileAccessor(InputData input) throws Exception {
super(input, new RCFileInputFormat());
- String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name());
- initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
- filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.COLUMNAR_SERDE, PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE);
+ initPartitionFields(hiveUserData.getPartitionKeys());
+ filterInFragmenter = hiveUserData.isFilterInFragmenter();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
index 3837f78..55d7205 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
@@ -27,6 +27,7 @@ import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Plugin;
import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.commons.lang.CharUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -74,10 +75,13 @@ public class HiveResolver extends Plugin implements ReadResolver {
protected static final String COLLECTION_DELIM = ",";
protected String collectionDelim;
protected String mapkeyDelim;
- private SerDe deserializer;
+ //private SerDe deserializer;
+ protected SerDe deserializer;
private List<OneField> partitionFields;
- private String serdeName;
- private String propsString;
+ //private String serdeClassName;
+ protected String serdeClassName;
+ //private String propsString;
+ protected String propsString;
String partitionKeys;
protected char delimiter;
String nullChar = "\\N";
@@ -135,17 +139,11 @@ public class HiveResolver extends Plugin implements ReadResolver {
void parseUserData(InputData input) throws Exception {
final int EXPECTED_NUM_OF_TOKS = 5;
- String userData = new String(input.getFragmentUserData());
- String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
- if (toks.length != EXPECTED_NUM_OF_TOKS) {
- throw new UserDataException("HiveResolver expected "
- + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
- }
-
- serdeName = toks[1];
- propsString = toks[2];
- partitionKeys = toks[3];
+ serdeClassName = hiveUserData.getSerdeClassName();
+ propsString = hiveUserData.getPropertiesString();
+ partitionKeys = hiveUserData.getPartitionKeys();
collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
: input.getUserProperty("COLLECTION_DELIM");
@@ -160,14 +158,14 @@ public class HiveResolver extends Plugin implements ReadResolver {
void initSerde(InputData inputData) throws Exception {
Properties serdeProperties;
- Class<?> c = Class.forName(serdeName, true, JavaUtils.getClassLoader());
+ Class<?> c = Class.forName(serdeClassName, true, JavaUtils.getClassLoader());
deserializer = (SerDe) c.newInstance();
serdeProperties = new Properties();
- ByteArrayInputStream inStream = new ByteArrayInputStream(
- propsString.getBytes());
- serdeProperties.load(inStream);
- deserializer.initialize(new JobConf(conf, HiveResolver.class),
- serdeProperties);
+ if (propsString != null ) {
+ ByteArrayInputStream inStream = new ByteArrayInputStream(propsString.getBytes());
+ serdeProperties.load(inStream);
+ }
+ deserializer.initialize(new JobConf(conf, HiveResolver.class), serdeProperties);
}
/*
@@ -271,7 +269,7 @@ public class HiveResolver extends Plugin implements ReadResolver {
* The partition fields are initialized one time based on userData provided
* by the fragmenter.
*/
- void initPartitionFields(StringBuilder parts) {
+ void initTextPartitionFields(StringBuilder parts) {
if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
index fdc5f69..c7cfb36 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
@@ -22,7 +22,10 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.hawq.pxf.api.OneField;
import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.OutputFormat;
import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
import java.util.Collections;
import java.util.List;
@@ -42,21 +45,34 @@ public class HiveStringPassResolver extends HiveResolver {
@Override
void parseUserData(InputData input) throws Exception {
- String userData = new String(input.getFragmentUserData());
- String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
parseDelimiterChar(input);
parts = new StringBuilder();
- partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+ partitionKeys = hiveUserData.getPartitionKeys();
+ serdeClassName = hiveUserData.getSerdeClassName();
+
+ /* Needed only for BINARY format*/
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.BINARY) {
+ propsString = hiveUserData.getPropertiesString();
+ }
}
@Override
- void initSerde(InputData input) {
- /* nothing to do here */
+ void initSerde(InputData input) throws Exception {
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+ /* nothing to do here */
+ } else {
+ super.initSerde(input);
+ }
}
@Override
void initPartitionFields() {
- initPartitionFields(parts);
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+ initTextPartitionFields(parts);
+ } else {
+ super.initPartitionFields();
+ }
}
/**
@@ -66,9 +82,17 @@ public class HiveStringPassResolver extends HiveResolver {
*/
@Override
public List<OneField> getFields(OneRow onerow) throws Exception {
- String line = (onerow.getData()).toString();
+ if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+ String line = (onerow.getData()).toString();
+
+ /* We follow Hive convention. Partition fields are always added at the end of the record */
+ return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
+ } else {
+ return super.getFields(onerow);
+ }
+ }
- /* We follow Hive convention. Partition fields are always added at the end of the record */
- return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
+ void parseDelimiterChar(InputData input) {
+ this.delimiter = 1;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
new file mode 100644
index 0000000..710700a
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.plugins.hive;
+
+public class HiveUserData {
+
+ public static final String HIVE_UD_DELIM = "!HUDD!";
+
+ public HiveUserData(String inputFormatName, String serdeClassName,
+ String propertiesString, String partitionKeys,
+ boolean filterInFragmenter) {
+ this.inputFormatName = inputFormatName;
+ this.serdeClassName = serdeClassName;
+ this.propertiesString = propertiesString;
+ this.partitionKeys = partitionKeys;
+ this.filterInFragmenter = filterInFragmenter;
+ }
+
+ public String getInputFormatName() {
+ return inputFormatName;
+ }
+
+ public String getSerdeClassName() {
+ return serdeClassName;
+ }
+
+ public String getPropertiesString() {
+ return propertiesString;
+ }
+
+ public String getPartitionKeys() {
+ return partitionKeys;
+ }
+
+ public boolean isFilterInFragmenter() {
+ return filterInFragmenter;
+ }
+
+ private String inputFormatName;
+ private String serdeClassName;
+ private String propertiesString;
+ private String partitionKeys;
+ private boolean filterInFragmenter;
+
+ @Override
+ public String toString() {
+ return inputFormatName + HiveUserData.HIVE_UD_DELIM
+ + serdeClassName + HiveUserData.HIVE_UD_DELIM
+ + propertiesString + HiveUserData.HIVE_UD_DELIM
+ + partitionKeys + HiveUserData.HIVE_UD_DELIM
+ + filterInFragmenter;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index ffd66b8..b78c379 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -39,13 +39,16 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hawq.pxf.api.Fragmenter;
import org.apache.hawq.pxf.api.Metadata;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.UserDataException;
import org.apache.hawq.pxf.api.utilities.EnumHawqType;
+import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.io.DataType;
import org.apache.hawq.pxf.plugins.hive.HiveDataFragmenter;
import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter;
import org.apache.hawq.pxf.plugins.hive.HiveTablePartition;
import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS;
-import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import org.apache.hawq.pxf.plugins.hive.HiveUserData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
/**
* Class containing helper functions connecting
@@ -53,6 +56,38 @@ import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDE
*/
public class HiveUtilities {
+ /** Defines the Hive serializers (serde classes) currently supported in pxf */
+ public enum PXF_HIVE_SERDES {
+ COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"),
+ LAZY_BINARY_COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"),
+ LAZY_SIMPLE_SERDE("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
+ ORC_SERDE("org.apache.hadoop.hive.ql.io.orc.OrcSerde");
+
+ private String serdeClassName;
+
+ PXF_HIVE_SERDES(String serdeClassName) {
+ this.serdeClassName = serdeClassName;
+ }
+
+ public static PXF_HIVE_SERDES getPxfHiveSerde(String serdeClassName, PXF_HIVE_SERDES... allowedSerdes) {
+ for (PXF_HIVE_SERDES s : values()) {
+ if (s.getSerdeClassName().equals(serdeClassName)) {
+
+ if (allowedSerdes.length > 0
+ && !Arrays.asList(allowedSerdes).contains(s)) {
+ throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeClassName);
+ }
+ return s;
+ }
+ }
+ throw new UnsupportedTypeException("Unable to find serde for class name: "+ serdeClassName);
+ }
+
+ public String getSerdeClassName() {
+ return serdeClassName;
+ }
+ }
+
private static final Log LOG = LogFactory.getLog(HiveUtilities.class);
private static final String WILDCARD = "*";
@@ -64,10 +99,7 @@ public class HiveUtilities {
static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
- static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
- static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe";
- static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
- static final String STR_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
+ private static final int EXPECTED_NUM_OF_TOKS = 5;
/**
* Initializes the HiveMetaStoreClient
@@ -376,31 +408,6 @@ public class HiveUtilities {
}
}
- /*
- * Validates that partition serde corresponds to PXF supported serdes and
- * transforms the class name to an enumeration for writing it to the
- * resolvers on other PXF instances.
- */
- private static String assertSerde(String className, HiveTablePartition partData)
- throws Exception {
- switch (className) {
- case STR_COLUMNAR_SERDE:
- return PXF_HIVE_SERDES.COLUMNAR_SERDE.name();
- case STR_LAZY_BINARY_COLUMNAR_SERDE:
- return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name();
- case STR_LAZY_SIMPLE_SERDE:
- return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name();
- case STR_ORC_SERDE:
- return PXF_HIVE_SERDES.ORC_SERDE.name();
- default:
- throw new UnsupportedTypeException(
- "HiveInputFormatFragmenter does not yet support "
- + className + " for " + partData
- + ". Supported serializers are: "
- + Arrays.toString(PXF_HIVE_SERDES.values()));
- }
- }
-
/* Turns the partition keys into a string */
public static String serializePartitionKeys(HiveTablePartition partData) throws Exception {
@@ -432,7 +439,7 @@ public class HiveUtilities {
@SuppressWarnings("unchecked")
public static byte[] makeUserData(String fragmenterClassName, HiveTablePartition partData, boolean filterInFragmenter) throws Exception {
- String userData = null;
+ HiveUserData hiveUserData = null;
if (fragmenterClassName == null) {
throw new IllegalArgumentException("No fragmenter provided.");
@@ -440,24 +447,36 @@ public class HiveUtilities {
Class fragmenterClass = Class.forName(fragmenterClassName);
+ String inputFormatName = partData.storageDesc.getInputFormat();
+ String serdeClassName = partData.storageDesc.getSerdeInfo().getSerializationLib();
+ String propertiesString = serializeProperties(partData.properties);
+ String partitionKeys = serializePartitionKeys(partData);
+
if (HiveInputFormatFragmenter.class.isAssignableFrom(fragmenterClass)) {
- String inputFormatName = partData.storageDesc.getInputFormat();
- String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
- String partitionKeys = serializePartitionKeys(partData);
assertFileType(inputFormatName, partData);
- userData = assertSerde(serdeName, partData) + HiveDataFragmenter.HIVE_UD_DELIM
- + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter;
- } else if (HiveDataFragmenter.class.isAssignableFrom(fragmenterClass)){
- String inputFormatName = partData.storageDesc.getInputFormat();
- String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
- String propertiesString = serializeProperties(partData.properties);
- String partitionKeys = serializePartitionKeys(partData);
- userData = inputFormatName + HiveDataFragmenter.HIVE_UD_DELIM + serdeName
- + HiveDataFragmenter.HIVE_UD_DELIM + propertiesString + HiveDataFragmenter.HIVE_UD_DELIM
- + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter;
- } else {
- throw new IllegalArgumentException("HiveUtilities#makeUserData is not implemented for " + fragmenterClassName);
}
- return userData.getBytes();
+
+ hiveUserData = new HiveUserData(inputFormatName, serdeClassName, propertiesString, partitionKeys, filterInFragmenter);
+
+ return hiveUserData.toString().getBytes();
+ }
+
+ public static HiveUserData parseHiveUserData(InputData input, PXF_HIVE_SERDES... supportedSerdes) throws UserDataException{
+ String userData = new String(input.getFragmentUserData());
+ String[] toks = userData.split(HiveUserData.HIVE_UD_DELIM);
+
+ if (toks.length != (EXPECTED_NUM_OF_TOKS)) {
+ throw new UserDataException("HiveInputFormatFragmenter expected "
+ + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
+ }
+
+ HiveUserData hiveUserData = new HiveUserData(toks[0], toks[1], toks[2], toks[3], Boolean.valueOf(toks[4]));
+
+ if (supportedSerdes.length > 0) {
+ /* Make sure this serde is supported */
+ PXF_HIVE_SERDES pxfHiveSerde = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName(), supportedSerdes);
+ }
+
+ return hiveUserData;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
index 7bbe811..1d90d01 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.mapred.*;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -43,7 +45,7 @@ import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({HiveORCAccessor.class, HiveInputFormatFragmenter.class, HdfsUtilities.class, HiveDataFragmenter.class})
+@PrepareForTest({HiveORCAccessor.class, HiveUtilities.class, HdfsUtilities.class, HiveDataFragmenter.class})
@SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf",
"org.apache.hadoop.hive.metastore.api.MetaException",
"org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities"}) // Prevents static inits
@@ -61,8 +63,9 @@ public class HiveORCAccessorTest {
jobConf = new JobConf();
PowerMockito.whenNew(JobConf.class).withAnyArguments().thenReturn(jobConf);
- PowerMockito.mockStatic(HiveInputFormatFragmenter.class);
- PowerMockito.when(HiveInputFormatFragmenter.parseToks(any(InputData.class), any(String[].class))).thenReturn(new String[]{"", HiveDataFragmenter.HIVE_NO_PART_TBL, "true"});
+ PowerMockito.mockStatic(HiveUtilities.class);
+ PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true));
+
PowerMockito.mockStatic(HdfsUtilities.class);
PowerMockito.mockStatic(HiveDataFragmenter.class);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
index fc5ed0f..d053760 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
@@ -19,26 +19,30 @@ package org.apache.hawq.pxf.service;
* under the License.
*/
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
public class ProfileFactory {
private static final String HIVE_TEXT_PROFILE = "HiveText";
private static final String HIVE_RC_PROFILE = "HiveRC";
private static final String HIVE_ORC_PROFILE = "HiveORC";
+ private static final String HIVE_PROFILE = "Hive";
- public static String get(InputFormat inputFormat) throws Exception {
+ public static String get(InputFormat inputFormat) {
String profileName = null;
- // TODO: Uncomment in process of HAWQ-1228 implementation
- //if (inputFormat instanceof TextInputFormat) {
- // profileName = HIVE_TEXT_PROFILE;
- //} else if (inputFormat instanceof RCFileInputFormat) {
- // profileName = HIVE_RC_PROFILE;
- /*} else */if (inputFormat instanceof OrcInputFormat) {
+ if (inputFormat instanceof TextInputFormat) {
+ profileName = HIVE_TEXT_PROFILE;
+ } else if (inputFormat instanceof RCFileInputFormat) {
+ profileName = HIVE_RC_PROFILE;
+ } else if (inputFormat instanceof OrcInputFormat) {
profileName = HIVE_ORC_PROFILE;
+ } else {
+ //Default case
+ profileName = HIVE_PROFILE;
}
-
return profileName;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
index 1edb6d5..a3f21f6 100644
--- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
+++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
@@ -49,6 +49,7 @@ under the License.
<accessor>org.apache.hawq.pxf.plugins.hive.HiveAccessor</accessor>
<resolver>org.apache.hawq.pxf.plugins.hive.HiveResolver</resolver>
<metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+ <outputFormat>BINARY</outputFormat>
</plugins>
</profile>
<profile>
@@ -63,6 +64,7 @@ under the License.
<accessor>org.apache.hawq.pxf.plugins.hive.HiveRCFileAccessor</accessor>
<resolver>org.apache.hawq.pxf.plugins.hive.HiveColumnarSerdeResolver</resolver>
<metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+ <outputFormat>TEXT</outputFormat>
</plugins>
</profile>
<profile>
@@ -76,6 +78,7 @@ under the License.
<accessor>org.apache.hawq.pxf.plugins.hive.HiveLineBreakAccessor</accessor>
<resolver>org.apache.hawq.pxf.plugins.hive.HiveStringPassResolver</resolver>
<metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+ <outputFormat>TEXT</outputFormat>
</plugins>
</profile>
<profile>
@@ -89,6 +92,7 @@ under the License.
<accessor>org.apache.hawq.pxf.plugins.hive.HiveORCAccessor</accessor>
<resolver>org.apache.hawq.pxf.plugins.hive.HiveORCSerdeResolver</resolver>
<metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+ <outputFormat>BINARY</outputFormat>
</plugins>
</profile>
<profile>
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 70a115a..20f662d 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -461,8 +461,12 @@ external_stopscan(FileScanDesc scan)
ExternalSelectDesc
external_getnext_init(PlanState *state) {
ExternalSelectDesc desc = (ExternalSelectDesc) palloc0(sizeof(ExternalSelectDescData));
+
if (state != NULL)
+ {
desc->projInfo = state->ps_ProjInfo;
+ desc->fmttype = &((ExternalScan *) state->plan)->fmtType;
+ }
return desc;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/backend/catalog/external/externalmd.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/external/externalmd.c b/src/backend/catalog/external/externalmd.c
index 0e39d25..ccdbdd6 100644
--- a/src/backend/catalog/external/externalmd.c
+++ b/src/backend/catalog/external/externalmd.c
@@ -57,6 +57,8 @@ static void LoadDistributionPolicy(Oid relid, PxfItem *pxfItem);
static void LoadExtTable(Oid relid, PxfItem *pxfItem);
static void LoadColumns(Oid relid, List *columns);
static int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nTypeMod);
+static Datum GetFormatTypeForProfile(const List *outputFormats);
+static Datum GetFormatOptionsForProfile(const List *outputFormats);
const int maxNumTypeModifiers = 2;
/*
@@ -128,7 +130,22 @@ static PxfItem *ParsePxfItem(struct json_object *pxfMD, char* profile)
pxfItem->profile = profile;
pxfItem->path = pstrdup(json_object_get_string(itemPath));
pxfItem->name = pstrdup(json_object_get_string(itemName));
-
+
+ /* parse output formats */
+ struct json_object *jsonOutputFormats = json_object_object_get(pxfMD, "outputFormats");
+
+ if (NULL != jsonOutputFormats)
+ {
+ const int numOutputFormats = json_object_array_length(jsonOutputFormats);
+ for (int i = 0; i < numOutputFormats; i++)
+ {
+ PxfField *pxfField = palloc0(sizeof(PxfField));
+ struct json_object *jsonOutputFormat = json_object_array_get_idx(jsonOutputFormats, i);
+ char *outupFormat = pstrdup(json_object_get_string(jsonOutputFormat));
+ pxfItem->outputFormats = lappend(pxfItem->outputFormats, outupFormat);
+ }
+ }
+
elog(DEBUG1, "Parsed item %s, namespace %s", itemName, itemPath);
/* parse columns */
@@ -464,17 +481,10 @@ static void LoadExtTable(Oid relid, PxfItem *pxfItem)
Assert(NULL != astate);
Datum location = makeArrayResult(astate, CurrentMemoryContext);
- /* format options - should be "formatter 'pxfwritable_import'" */
- StringInfoData formatStr;
- initStringInfo(&formatStr);
- appendStringInfo(&formatStr, "formatter 'pxfwritable_import'");
- Datum format_opts = DirectFunctionCall1(textin, CStringGetDatum(formatStr.data));
- pfree(formatStr.data);
-
values[Anum_pg_exttable_reloid - 1] = ObjectIdGetDatum(relid);
values[Anum_pg_exttable_location - 1] = location;
- values[Anum_pg_exttable_fmttype - 1] = CharGetDatum('b' /* binary */);
- values[Anum_pg_exttable_fmtopts - 1] = format_opts;
+ values[Anum_pg_exttable_fmttype - 1] = GetFormatTypeForProfile(pxfItem->outputFormats);
+ values[Anum_pg_exttable_fmtopts - 1] = GetFormatOptionsForProfile(pxfItem->outputFormats);
nulls[Anum_pg_exttable_command - 1] = true;
nulls[Anum_pg_exttable_rejectlimit - 1] = true;
nulls[Anum_pg_exttable_rejectlimittype - 1] = true;
@@ -631,3 +641,30 @@ static int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nT
return VARHDRSZ + result;
}
+static Datum GetFormatTypeForProfile(const List *outputFormats)
+{
+
+ if (list_length(outputFormats) == 1 && strcmp(lfirst(list_head(outputFormats)),"TEXT") == 0)
+ {
+ return CharGetDatum(TextFormatType);
+ } else
+ {
+ return CharGetDatum(CustomFormatType);
+ }
+}
+
+static Datum GetFormatOptionsForProfile(const List *outputFormats)
+{
+ StringInfoData formatStr;
+ initStringInfo(&formatStr);
+ if (list_length(outputFormats) == 1 && strcmp(lfirst(list_head(outputFormats)),"TEXT") == 0)
+ {
+ appendStringInfo(&formatStr, "delimiter '\x01' null '\N' escape '\'");
+ } else {
+ appendStringInfo(&formatStr, "formatter 'pxfwritable_import'");
+ }
+ Datum format_opts = DirectFunctionCall1(textin, CStringGetDatum(formatStr.data));
+ pfree(formatStr.data);
+ return format_opts;
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index b524df8..c5c217c 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -49,7 +49,7 @@ gphadoop_context* create_context(PG_FUNCTION_ARGS);
void add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS);
void append_churl_header_if_exists(gphadoop_context* context,
const char* key, const char* value);
-void set_current_fragment_headers(gphadoop_context* context);
+void set_current_fragment_headers(gphadoop_context* context, char *fmttype);
void gpbridge_import_start(PG_FUNCTION_ARGS);
void gpbridge_export_start(PG_FUNCTION_ARGS);
PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel);
@@ -62,6 +62,7 @@ void build_uri_for_write(gphadoop_context* context, PxfServer* rest_server);
size_t fill_buffer(gphadoop_context* context, char* start, size_t size);
void add_delegation_token(PxfInputData *inputData);
void free_token_resources(PxfInputData *inputData);
+static void assign_optimal_supported_profile(char *profile, char *fmttype, char **supportedProfile, char **supportedFormat);
/* Custom protocol entry point for read
*/
@@ -207,7 +208,7 @@ void append_churl_header_if_exists(gphadoop_context* context, const char* key, c
* 2. X-GP-FRAGMENT-USER-DATA header is changed to the current fragment's user data.
* If the fragment doesn't have user data, the header will be removed.
*/
-void set_current_fragment_headers(gphadoop_context* context)
+void set_current_fragment_headers(gphadoop_context* context, char *fmttype)
{
FragmentData* frag_data = (FragmentData*)lfirst(context->current_fragment);
elog(DEBUG2, "pxf: set_current_fragment_source_name: source_name %s, index %s, has user data: %s ",
@@ -229,11 +230,21 @@ void set_current_fragment_headers(gphadoop_context* context)
/* if current fragment has optimal profile set it*/
if (frag_data->profile)
{
- churl_headers_override(context->churl_headers, "X-GP-PROFILE", frag_data->profile);
+ char *supportedProfile = NULL;
+ char *supportedFormat = NULL;
+ assign_optimal_supported_profile(frag_data->profile, fmttype, &supportedProfile, &supportedFormat);
+
+ churl_headers_override(context->churl_headers, "X-GP-PROFILE", supportedProfile);
+ churl_headers_override(context->churl_headers, "X-GP-FORMAT", supportedFormat);
} else if (context->gphd_uri->profile)
{
/* if current fragment doesn't have any optimal profile, set to use profile from url */
- churl_headers_override(context->churl_headers, "X-GP-PROFILE", context->gphd_uri->profile);
+ char *supportedProfile = NULL;
+ char *supportedFormat = NULL;
+ assign_optimal_supported_profile(context->gphd_uri->profile, fmttype, &supportedProfile, &supportedFormat);
+
+ churl_headers_override(context->churl_headers, "X-GP-PROFILE", supportedProfile);
+ churl_headers_override(context->churl_headers, "X-GP-FORMAT", supportedFormat);
}
/* if there is no profile passed in url, we expect to have accessor+fragmenter+resolver so no action needed by this point */
@@ -249,7 +260,8 @@ void gpbridge_import_start(PG_FUNCTION_ARGS)
context->churl_headers = churl_headers_init();
add_querydata_to_http_header(context, fcinfo);
- set_current_fragment_headers(context);
+ char *fmttype = EXTPROTOCOL_GET_FMTTYPE(fcinfo);
+ set_current_fragment_headers(context, fmttype);
context->churl_handle = churl_init_download(context->uri.data,
context->churl_headers);
@@ -399,6 +411,7 @@ PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel)
size_t gpbridge_read(PG_FUNCTION_ARGS)
{
char* databuf;
+ char* fmttype;
size_t datalen;
size_t n = 0;
gphadoop_context* context;
@@ -406,6 +419,7 @@ size_t gpbridge_read(PG_FUNCTION_ARGS)
context = EXTPROTOCOL_GET_USER_CTX(fcinfo);
databuf = EXTPROTOCOL_GET_DATABUF(fcinfo);
datalen = EXTPROTOCOL_GET_DATALEN(fcinfo);
+ fmttype = EXTPROTOCOL_GET_FMTTYPE(fcinfo);
while ((n = fill_buffer(context, databuf, datalen)) == 0)
{
@@ -419,7 +433,7 @@ size_t gpbridge_read(PG_FUNCTION_ARGS)
if (context->current_fragment == NULL)
return 0;
- set_current_fragment_headers(context);
+ set_current_fragment_headers(context, fmttype);
churl_download_restart(context->churl_handle, context->uri.data, context->churl_headers);
/* read some bytes to make sure the connection is established */
@@ -547,3 +561,20 @@ void free_token_resources(PxfInputData *inputData)
pfree(inputData->token);
}
+
+static void assign_optimal_supported_profile(char *profile, char *fmttype, char **supportedProfile, char **supportedFormat)
+{
+ if (fmttype_is_text(*fmttype) && ((strcmp(profile, "HiveText") == 0) || (strcmp(profile, "HiveRc") == 0)))
+ {
+ *supportedFormat = "TEXT";
+ *supportedProfile = profile;
+ } else if (fmttype_is_custom(*fmttype))
+ {
+ *supportedFormat = "GPDBWritable";
+ *supportedProfile = profile;
+ } else
+ {
+ *supportedFormat = "GPDBWritable";
+ *supportedProfile = "Hive";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/access/extprotocol.h
----------------------------------------------------------------------
diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h
index 4b69bb7..4c37ac6 100644
--- a/src/include/access/extprotocol.h
+++ b/src/include/access/extprotocol.h
@@ -66,6 +66,7 @@ typedef ExtProtocolData *ExtProtocol;
#define EXTPROTOCOL_GET_USER_CTX(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_user_ctx)
#define EXTPROTOCOL_GET_SELECTDESC(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc)
#define EXTPROTOCOL_GET_PROJINFO(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->projInfo)
+#define EXTPROTOCOL_GET_FMTTYPE(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->fmttype)
#define EXTPROTOCOL_IS_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call)
#define EXTPROTOCOL_SET_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call = true)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/access/fileam.h
----------------------------------------------------------------------
diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h
index 1e926d5..df8c284 100644
--- a/src/include/access/fileam.h
+++ b/src/include/access/fileam.h
@@ -70,6 +70,7 @@ typedef ExternalInsertDescData *ExternalInsertDesc;
typedef struct ExternalSelectDescData
{
ProjectionInfo *projInfo;
+ char *fmttype;
} ExternalSelectDescData;
typedef enum DataLineStatus
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/catalog/external/itemmd.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/external/itemmd.h b/src/include/catalog/external/itemmd.h
index e6dad63..5717a53 100644
--- a/src/include/catalog/external/itemmd.h
+++ b/src/include/catalog/external/itemmd.h
@@ -67,6 +67,9 @@ typedef struct PxfItem
/* fields */
List *fields;
+
+ /* output formats*/
+ List *outputFormats;
} PxfItem;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/catalog/pg_exttable.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_exttable.h b/src/include/catalog/pg_exttable.h
index 3a0fadd..ae2fb00 100644
--- a/src/include/catalog/pg_exttable.h
+++ b/src/include/catalog/pg_exttable.h
@@ -164,8 +164,12 @@ GetExtTableEntry(Oid relid);
extern void
RemoveExtTableEntry(Oid relid);
-#define fmttype_is_custom(c) (c == 'b')
-#define fmttype_is_text(c) (c == 't')
-#define fmttype_is_csv(c) (c == 'c')
+#define CustomFormatType 'b'
+#define TextFormatType 't'
+#define CsvFormatType 'c'
+
+#define fmttype_is_custom(c) (c == CustomFormatType)
+#define fmttype_is_text(c) (c == TextFormatType)
+#define fmttype_is_csv(c) (c == CsvFormatType)
#endif /* PG_EXTTABLE_H */
[2/4] incubator-hawq git commit: HAWQ-1228. Hardcoded comma as
delimiter.
Posted by od...@apache.org.
HAWQ-1228. Hardcoded comma as delimiter.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/8fe27032
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/8fe27032
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/8fe27032
Branch: refs/heads/HAWQ-1228
Commit: 8fe27032f1faf786544a5109e854d319f6e6a1c4
Parents: 0574e75
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Fri Jan 6 18:39:32 2017 -0800
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Fri Jan 6 18:39:32 2017 -0800
----------------------------------------------------------------------
.../org/apache/hawq/pxf/api/OutputFormat.java | 2 +-
.../plugins/hive/HiveColumnarSerdeResolver.java | 2 +-
.../pxf/plugins/hive/HiveMetadataFetcher.java | 2 +-
.../pxf/plugins/hive/HiveORCSerdeResolver.java | 22 --------------------
.../hawq/pxf/plugins/hive/HiveResolver.java | 3 ---
.../plugins/hive/HiveStringPassResolver.java | 2 +-
src/backend/catalog/external/externalmd.c | 2 +-
src/bin/gpfusion/gpbridgeapi.c | 4 ++--
src/include/access/hd_work_mgr.h | 2 ++
9 files changed, 9 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8fe27032/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
index 82a747f..230f9ff 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
@@ -23,4 +23,4 @@ package org.apache.hawq.pxf.api;
/**
* PXF supported output formats: {@link #TEXT} and {@link #BINARY}
*/
-public enum OutputFormat {TEXT, BINARY, UNKNOWN}
+public enum OutputFormat {TEXT, BINARY}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8fe27032/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
index 2bf39ff..8fd37e0 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -241,6 +241,6 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
@Override
void parseDelimiterChar(InputData input) {
- delimiter = 1;
+ delimiter = 44; //,
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8fe27032/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
index 3d04bc1..76e8e18 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
@@ -162,7 +162,7 @@ public class HiveMetadataFetcher extends MetadataFetcher {
}
private OutputFormat getOutputFormat(String inputFormat) {
- OutputFormat outputFormat = OutputFormat.UNKNOWN;
+ OutputFormat outputFormat = null;
try {
InputFormat<?, ?> fformat = HiveDataFragmenter.makeInputFormat(inputFormat, jobConf);
String profile = ProfileFactory.get(fformat);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8fe27032/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
index 381c407..0c7a417 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
@@ -44,7 +44,6 @@ import java.util.*;
*/
public class HiveORCSerdeResolver extends HiveResolver {
private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class);
- private OrcSerde deserializer;
private HiveUtilities.PXF_HIVE_SERDES serdeType;
public HiveORCSerdeResolver(InputData input) throws Exception {
@@ -63,27 +62,6 @@ public class HiveORCSerdeResolver extends HiveResolver {
: input.getUserProperty("MAPKEY_DELIM");
}
- /**
- * getFields returns a singleton list of OneField item.
- * OneField item contains two fields: an integer representing the VARCHAR type and a Java
- * Object representing the field value.
- */
- //TODO: It's the same as in parent class
- @Override
- public List<OneField> getFields(OneRow onerow) throws Exception {
-
- Object tuple = deserializer.deserialize((Writable) onerow.getData());
- // Each Hive record is a Struct
- StructObjectInspector soi = (StructObjectInspector) deserializer.getObjectInspector();
- List<OneField> record = traverseStruct(tuple, soi, false);
-
- //Add partition fields if any
- record.addAll(getPartitionFields());
-
- return record;
-
- }
-
/*
* Get and init the deserializer for the records of this Hive data fragment.
* Suppress Warnings added because deserializer.initialize is an abstract function that is deprecated
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8fe27032/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
index 55d7205..b0dc2fb 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
@@ -75,12 +75,9 @@ public class HiveResolver extends Plugin implements ReadResolver {
protected static final String COLLECTION_DELIM = ",";
protected String collectionDelim;
protected String mapkeyDelim;
- //private SerDe deserializer;
protected SerDe deserializer;
private List<OneField> partitionFields;
- //private String serdeClassName;
protected String serdeClassName;
- //private String propsString;
protected String propsString;
String partitionKeys;
protected char delimiter;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8fe27032/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
index c7cfb36..8c91d47 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
@@ -93,6 +93,6 @@ public class HiveStringPassResolver extends HiveResolver {
}
void parseDelimiterChar(InputData input) {
- this.delimiter = 1;
+ delimiter = 44; //,
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8fe27032/src/backend/catalog/external/externalmd.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/external/externalmd.c b/src/backend/catalog/external/externalmd.c
index ccdbdd6..c6cfb69 100644
--- a/src/backend/catalog/external/externalmd.c
+++ b/src/backend/catalog/external/externalmd.c
@@ -659,7 +659,7 @@ static Datum GetFormatOptionsForProfile(const List *outputFormats)
initStringInfo(&formatStr);
if (list_length(outputFormats) == 1 && strcmp(lfirst(list_head(outputFormats)),"TEXT") == 0)
{
- appendStringInfo(&formatStr, "delimiter '\x01' null '\N' escape '\'");
+ appendStringInfo(&formatStr, "delimiter '\x2C' null '\\N' escape '\\'");
} else {
appendStringInfo(&formatStr, "formatter 'pxfwritable_import'");
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8fe27032/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index c5c217c..358c858 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -564,7 +564,7 @@ void free_token_resources(PxfInputData *inputData)
static void assign_optimal_supported_profile(char *profile, char *fmttype, char **supportedProfile, char **supportedFormat)
{
- if (fmttype_is_text(*fmttype) && ((strcmp(profile, "HiveText") == 0) || (strcmp(profile, "HiveRc") == 0)))
+ if (fmttype_is_text(*fmttype) && ((strcmp(profile, HiveTextProfileName) == 0) || (strcmp(profile, HiveRCProfileName) == 0)))
{
*supportedFormat = "TEXT";
*supportedProfile = profile;
@@ -575,6 +575,6 @@ static void assign_optimal_supported_profile(char *profile, char *fmttype, char
} else
{
*supportedFormat = "GPDBWritable";
- *supportedProfile = "Hive";
+ *supportedProfile = HiveProfileName;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8fe27032/src/include/access/hd_work_mgr.h
----------------------------------------------------------------------
diff --git a/src/include/access/hd_work_mgr.h b/src/include/access/hd_work_mgr.h
index cab8ca7..ea4c6ef 100644
--- a/src/include/access/hd_work_mgr.h
+++ b/src/include/access/hd_work_mgr.h
@@ -48,5 +48,7 @@ PxfFragmentStatsElem *get_pxf_fragments_statistics(char *uri, Relation rel);
List *get_pxf_item_metadata(char *profile, char *pattern, Oid dboid);
#define HiveProfileName "Hive"
+#define HiveTextProfileName "HiveText"
+#define HiveRCProfileName "HiveRC"
#endif /* HDWORKMGR_H */
[3/4] incubator-hawq git commit: HAWQ-1228. Support custom delimiter,
flat serialization of complex types in HiveText profile.
Posted by od...@apache.org.
HAWQ-1228. Support custom delimiter, flat serialization of complex types in HiveText profile.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/8ba2e161
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/8ba2e161
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/8ba2e161
Branch: refs/heads/HAWQ-1228
Commit: 8ba2e16180064b92d7b1d8a987a63a0ba6aff704
Parents: 8fe2703
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Thu Jan 19 17:20:20 2017 -0800
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Thu Jan 19 17:20:20 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/hawq/pxf/api/Metadata.java | 14 ++++++--
.../hawq/pxf/api/utilities/InputData.java | 2 ++
.../plugins/hive/HiveColumnarSerdeResolver.java | 1 -
.../pxf/plugins/hive/HiveMetadataFetcher.java | 20 +++++++++++-
.../hawq/pxf/plugins/hive/HiveResolver.java | 6 ++--
.../plugins/hive/HiveStringPassResolver.java | 18 ++++++++---
.../hawq/pxf/plugins/hive/HiveUserData.java | 34 +++++++++++++++++---
.../plugins/hive/utilities/HiveUtilities.java | 9 ++++--
.../pxf/plugins/hive/HiveORCAccessorTest.java | 2 +-
.../apache/hawq/pxf/service/ProfileFactory.java | 1 +
src/backend/catalog/external/externalmd.c | 26 +++++++++++----
src/include/catalog/external/itemmd.h | 2 ++
12 files changed, 106 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
index 7e3b92e..a920e4f 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
@@ -22,6 +22,7 @@ package org.apache.hawq.pxf.api;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.hawq.pxf.api.utilities.EnumHawqType;
@@ -124,18 +125,25 @@ public class Metadata {
* Item's fields
*/
private List<Metadata.Field> fields;
-
-
private Set<OutputFormat> outputFormats;
+ private Map<String, String> outputParameters;
public Set<OutputFormat> getOutputFormats() {
return outputFormats;
}
- public void setFormats(Set<OutputFormat> outputFormats) {
+ public void setOutputFormats(Set<OutputFormat> outputFormats) {
this.outputFormats = outputFormats;
}
+ public Map<String, String> getOutputParameters() {
+ return outputParameters;
+ }
+
+ public void setOutputParameters(Map<String, String> outputParameters) {
+ this.outputParameters = outputParameters;
+ }
+
/**
* Constructs an item's Metadata.
*
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
index 5afedca..0ecb9eb 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java
@@ -31,6 +31,8 @@ import java.util.*;
*/
public class InputData {
+ public static final String DELIMITER_KEY = "DELIMITER";
+
public static final int INVALID_SPLIT_IDX = -1;
private static final Log LOG = LogFactory.getLog(InputData.class);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
index 8fd37e0..157f723 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -59,7 +59,6 @@ import static org.apache.hawq.pxf.api.io.DataType.VARCHAR;
*/
public class HiveColumnarSerdeResolver extends HiveResolver {
private static final Log LOG = LogFactory.getLog(HiveColumnarSerdeResolver.class);
- //private ColumnarSerDeBase deserializer;
private boolean firstColumn;
private StringBuilder builder;
private StringBuilder parts;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
index 76e8e18..90943fc 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
@@ -21,8 +21,11 @@ package org.apache.hawq.pxf.plugins.hive;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
@@ -32,6 +35,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hawq.pxf.api.Metadata;
@@ -48,6 +52,11 @@ import org.apache.hawq.pxf.service.ProfileFactory;
*/
public class HiveMetadataFetcher extends MetadataFetcher {
+ private static final String DELIM_COLLECTION = "DELIM.COLLECTION";
+ private static final String DELIM_MAPKEY = "DELIM.MAPKEY";
+ private static final String DELIM_LINE = "DELIM.LINE";
+ private static final String DELIM_FIELD = InputData.DELIMITER_KEY;
+
private static final Log LOG = LogFactory.getLog(HiveMetadataFetcher.class);
private HiveMetaStoreClient client;
private JobConf jobConf;
@@ -108,7 +117,16 @@ public class HiveMetadataFetcher extends MetadataFetcher {
OutputFormat outputFormat = getOutputFormat(inputFormat);
formats.add(outputFormat);
}
- metadata.setFormats(formats);
+ metadata.setOutputFormats(formats);
+ if (tbl != null && tbl.getSd() != null && tbl.getSd().getSerdeInfo() != null) {
+ Map<String, String> outputParameters = new HashMap<String, String>();
+ Map<String, String> serdeParameters = tbl.getSd().getSerdeInfo().getParameters();
+ //outputParameters.put(DELIM_COLLECTION, String.valueOf((int) serdeParameters.get(serdeConstants.COLLECTION_DELIM).charAt(0)));
+ //outputParameters.put(DELIM_MAPKEY, String.valueOf((int) serdeParameters.get(serdeConstants.MAPKEY_DELIM).charAt(0)));
+ //outputParameters.put(DELIM_LINE, String.valueOf((int) serdeParameters.get(serdeConstants.LINE_DELIM).charAt(0)));
+ outputParameters.put(DELIM_FIELD, String.valueOf((int) serdeParameters.get(serdeConstants.FIELD_DELIM).charAt(0)));
+ metadata.setOutputParameters(outputParameters);
+ }
} catch (UnsupportedTypeException | UnsupportedOperationException e) {
if(ignoreErrors) {
LOG.warn("Metadata fetch for " + tblDesc.toString() + " failed. " + e.getMessage());
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
index b0dc2fb..16b08d7 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
@@ -134,8 +134,6 @@ public class HiveResolver extends Plugin implements ReadResolver {
/* Parses user data string (arrived from fragmenter). */
void parseUserData(InputData input) throws Exception {
- final int EXPECTED_NUM_OF_TOKS = 5;
-
HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
serdeClassName = hiveUserData.getSerdeClassName();
@@ -620,10 +618,10 @@ public class HiveResolver extends Plugin implements ReadResolver {
*/
void parseDelimiterChar(InputData input) {
- String userDelim = input.getUserProperty("DELIMITER");
+ String userDelim = input.getUserProperty(InputData.DELIMITER_KEY);
if (userDelim == null) {
- throw new IllegalArgumentException("DELIMITER is a required option");
+ throw new IllegalArgumentException(InputData.DELIMITER_KEY + " is a required option");
}
final int VALID_LENGTH = 1;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
index 8c91d47..65bce98 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
@@ -23,6 +23,7 @@ package org.apache.hawq.pxf.plugins.hive;
import org.apache.hawq.pxf.api.OneField;
import org.apache.hawq.pxf.api.OneRow;
import org.apache.hawq.pxf.api.OutputFormat;
+import org.apache.hawq.pxf.api.UserDataException;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
import org.apache.hawq.pxf.service.utilities.ProtocolData;
@@ -50,6 +51,8 @@ public class HiveStringPassResolver extends HiveResolver {
parts = new StringBuilder();
partitionKeys = hiveUserData.getPartitionKeys();
serdeClassName = hiveUserData.getSerdeClassName();
+ collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM : input.getUserProperty("COLLECTION_DELIM");
+ mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM : input.getUserProperty("MAPKEY_DELIM");
/* Needed only for BINARY format*/
if (((ProtocolData) inputData).outputFormat() == OutputFormat.BINARY) {
@@ -84,15 +87,22 @@ public class HiveStringPassResolver extends HiveResolver {
public List<OneField> getFields(OneRow onerow) throws Exception {
if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
String line = (onerow.getData()).toString();
-
+ String replacedLine = replaceComplexSpecCharacters(line);
/* We follow Hive convention. Partition fields are always added at the end of the record */
- return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
+ return Collections.singletonList(new OneField(VARCHAR.getOID(), replacedLine + parts));
} else {
return super.getFields(onerow);
}
}
- void parseDelimiterChar(InputData input) {
- delimiter = 44; //,
+ private String replaceComplexSpecCharacters(String line) throws UserDataException {
+ HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(inputData);
+ char collectionDelimChar = (char)Integer.valueOf(hiveUserData.getCollectionDelim()).intValue();
+ char mapKeyDelimChar = (char)Integer.valueOf(hiveUserData.getMapKeyDelim()).intValue();
+ String replacedLine = line;
+ replacedLine = line.replace(Character.toString(collectionDelimChar), collectionDelim);
+ replacedLine = replacedLine.replace(Character.toString(mapKeyDelimChar), mapkeyDelim);
+ return replacedLine;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
index 710700a..07159ca 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
@@ -25,12 +25,17 @@ public class HiveUserData {
public HiveUserData(String inputFormatName, String serdeClassName,
String propertiesString, String partitionKeys,
- boolean filterInFragmenter) {
+ boolean filterInFragmenter,
+ String collectionDelim,
+ String mapKeyDelim) {
+
this.inputFormatName = inputFormatName;
this.serdeClassName = serdeClassName;
this.propertiesString = propertiesString;
this.partitionKeys = partitionKeys;
this.filterInFragmenter = filterInFragmenter;
+ this.collectionDelim = collectionDelim;
+ this.mapKeyDelim = mapKeyDelim;
}
public String getInputFormatName() {
@@ -53,19 +58,38 @@ public class HiveUserData {
return filterInFragmenter;
}
+ public String getCollectionDelim() {
+ return collectionDelim;
+ }
+
+ public void setCollectionDelim(String collectionDelim) {
+ this.collectionDelim = collectionDelim;
+ }
+
+ public String getMapKeyDelim() {
+ return mapKeyDelim;
+ }
+
+ public void setMapKeyDelim(String mapKeyDelim) {
+ this.mapKeyDelim = mapKeyDelim;
+ }
+
private String inputFormatName;
private String serdeClassName;
private String propertiesString;
private String partitionKeys;
private boolean filterInFragmenter;
+ private String collectionDelim;
+ private String mapKeyDelim;
@Override
public String toString() {
- return inputFormatName + HiveUserData.HIVE_UD_DELIM
+ return inputFormatName + HiveUserData.HIVE_UD_DELIM
+ serdeClassName + HiveUserData.HIVE_UD_DELIM
+ propertiesString + HiveUserData.HIVE_UD_DELIM
- + partitionKeys + HiveUserData.HIVE_UD_DELIM
- + filterInFragmenter;
+ + partitionKeys + HiveUserData.HIVE_UD_DELIM
+ + filterInFragmenter + HiveUserData.HIVE_UD_DELIM
+ + collectionDelim + HiveUserData.HIVE_UD_DELIM
+ + mapKeyDelim;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index b78c379..f8d12ab 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hawq.pxf.api.Fragmenter;
import org.apache.hawq.pxf.api.Metadata;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
@@ -99,7 +100,7 @@ public class HiveUtilities {
static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
- private static final int EXPECTED_NUM_OF_TOKS = 5;
+ private static final int EXPECTED_NUM_OF_TOKS = 7;
/**
* Initializes the HiveMetaStoreClient
@@ -451,12 +452,14 @@ public class HiveUtilities {
String serdeClassName = partData.storageDesc.getSerdeInfo().getSerializationLib();
String propertiesString = serializeProperties(partData.properties);
String partitionKeys = serializePartitionKeys(partData);
+ String collectionDelim = String.valueOf((int) partData.storageDesc.getSerdeInfo().getParameters().get(serdeConstants.COLLECTION_DELIM).charAt(0));
+ String mapKeyDelim = String.valueOf((int) partData.storageDesc.getSerdeInfo().getParameters().get(serdeConstants.MAPKEY_DELIM).charAt(0));
if (HiveInputFormatFragmenter.class.isAssignableFrom(fragmenterClass)) {
assertFileType(inputFormatName, partData);
}
- hiveUserData = new HiveUserData(inputFormatName, serdeClassName, propertiesString, partitionKeys, filterInFragmenter);
+ hiveUserData = new HiveUserData(inputFormatName, serdeClassName, propertiesString, partitionKeys, filterInFragmenter, collectionDelim, mapKeyDelim);
return hiveUserData.toString().getBytes();
}
@@ -470,7 +473,7 @@ public class HiveUtilities {
+ EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
}
- HiveUserData hiveUserData = new HiveUserData(toks[0], toks[1], toks[2], toks[3], Boolean.valueOf(toks[4]));
+ HiveUserData hiveUserData = new HiveUserData(toks[0], toks[1], toks[2], toks[3], Boolean.valueOf(toks[4]), toks[5], toks[6]);
if (supportedSerdes.length > 0) {
/* Make sure this serde is supported */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
index 1d90d01..30233a4 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
@@ -64,7 +64,7 @@ public class HiveORCAccessorTest {
PowerMockito.whenNew(JobConf.class).withAnyArguments().thenReturn(jobConf);
PowerMockito.mockStatic(HiveUtilities.class);
- PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true));
+ PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true, "2", "3"));
PowerMockito.mockStatic(HdfsUtilities.class);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
index d053760..092f89e 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hawq.pxf.api.Metadata;
public class ProfileFactory {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/src/backend/catalog/external/externalmd.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/external/externalmd.c b/src/backend/catalog/external/externalmd.c
index c6cfb69..4150f42 100644
--- a/src/backend/catalog/external/externalmd.c
+++ b/src/backend/catalog/external/externalmd.c
@@ -58,7 +58,7 @@ static void LoadExtTable(Oid relid, PxfItem *pxfItem);
static void LoadColumns(Oid relid, List *columns);
static int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nTypeMod);
static Datum GetFormatTypeForProfile(const List *outputFormats);
-static Datum GetFormatOptionsForProfile(const List *outputFormats);
+static Datum GetFormatOptionsForProfile(const List *outputFormats, int delimiter);
const int maxNumTypeModifiers = 2;
/*
@@ -126,7 +126,6 @@ static PxfItem *ParsePxfItem(struct json_object *pxfMD, char* profile)
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("Could not parse PXF item, expected not null value for attribute \"name\"")));
-
pxfItem->profile = profile;
pxfItem->path = pstrdup(json_object_get_string(itemPath));
pxfItem->name = pstrdup(json_object_get_string(itemName));
@@ -146,6 +145,14 @@ static PxfItem *ParsePxfItem(struct json_object *pxfMD, char* profile)
}
}
+ /* parse delimiter */
+ struct json_object *jsonOutputParameters = json_object_object_get(pxfMD, "outputParameters");
+ if (NULL != jsonOutputParameters)
+ {
+ struct json_object *outputParameterDelimiter = json_object_object_get(jsonOutputParameters, "DELIMITER");
+ pxfItem->delimiter = atoi(pstrdup(json_object_get_string(outputParameterDelimiter)));
+ }
+
elog(DEBUG1, "Parsed item %s, namespace %s", itemName, itemPath);
/* parse columns */
@@ -466,8 +473,8 @@ static void LoadExtTable(Oid relid, PxfItem *pxfItem)
* pxf://<ip:port/namaservice>/<hive db>.<hive table>?Profile=Hive */
StringInfoData locationStr;
initStringInfo(&locationStr);
- appendStringInfo(&locationStr, "pxf://%s/%s.%s?Profile=%s",
- pxf_service_address, pxfItem->path, pxfItem->name, pxfItem->profile);
+ appendStringInfo(&locationStr, "pxf://%s/%s.%s?Profile=%s&delimiter=%cx%02x",
+ pxf_service_address, pxfItem->path, pxfItem->name, pxfItem->profile, '\\', pxfItem->delimiter);
Size len = VARHDRSZ + locationStr.len;
/* +1 leaves room for sprintf's trailing null */
text *t = (text *) palloc(len + 1);
@@ -484,7 +491,7 @@ static void LoadExtTable(Oid relid, PxfItem *pxfItem)
values[Anum_pg_exttable_reloid - 1] = ObjectIdGetDatum(relid);
values[Anum_pg_exttable_location - 1] = location;
values[Anum_pg_exttable_fmttype - 1] = GetFormatTypeForProfile(pxfItem->outputFormats);
- values[Anum_pg_exttable_fmtopts - 1] = GetFormatOptionsForProfile(pxfItem->outputFormats);
+ values[Anum_pg_exttable_fmtopts - 1] = GetFormatOptionsForProfile(pxfItem->outputFormats, pxfItem->delimiter);
nulls[Anum_pg_exttable_command - 1] = true;
nulls[Anum_pg_exttable_rejectlimit - 1] = true;
nulls[Anum_pg_exttable_rejectlimittype - 1] = true;
@@ -653,13 +660,18 @@ static Datum GetFormatTypeForProfile(const List *outputFormats)
}
}
-static Datum GetFormatOptionsForProfile(const List *outputFormats)
+static Datum GetFormatOptionsForProfile(const List *outputFormats, int delimiter)
{
StringInfoData formatStr;
initStringInfo(&formatStr);
+ /* "delimiter 'delimiter' null '\\N' escape '\\'"*/
+ char formatArr[35] = { 0x64, 0x65, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x65,
+ 0x72, 0x20, 0x27, delimiter, 0x27, 0x20, 0x6e, 0x75, 0x6c, 0x6c,
+ 0x20, 0x27, 0x5c, 0x4e, 0x27, 0x20, 0x65, 0x73, 0x63, 0x61, 0x70,
+ 0x65, 0x20, 0x27, 0x5c, 0x27, 0x00 };
if (list_length(outputFormats) == 1 && strcmp(lfirst(list_head(outputFormats)),"TEXT") == 0)
{
- appendStringInfo(&formatStr, "delimiter '\x2C' null '\\N' escape '\\'");
+ appendStringInfo(&formatStr, "%s", formatArr);
} else {
appendStringInfo(&formatStr, "formatter 'pxfwritable_import'");
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8ba2e161/src/include/catalog/external/itemmd.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/external/itemmd.h b/src/include/catalog/external/itemmd.h
index 5717a53..d9f8721 100644
--- a/src/include/catalog/external/itemmd.h
+++ b/src/include/catalog/external/itemmd.h
@@ -70,6 +70,8 @@ typedef struct PxfItem
/* output formats*/
List *outputFormats;
+
+ int delimiter;
} PxfItem;
[4/4] incubator-hawq git commit: HAWQ-1228. Fixed complex types for
HiveText.
Posted by od...@apache.org.
HAWQ-1228. Fixed complex types for HiveText.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/607184c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/607184c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/607184c3
Branch: refs/heads/HAWQ-1228
Commit: 607184c365d872ce9b567ee2366122aac903c4a7
Parents: 8ba2e16
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Mon Jan 23 18:32:21 2017 -0800
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Mon Jan 23 18:32:21 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/hawq/pxf/api/Metadata.java | 18 +++-
.../org/apache/hawq/pxf/api/MetadataTest.java | 2 +-
.../plugins/hive/HiveColumnarSerdeResolver.java | 5 --
.../pxf/plugins/hive/HiveDataFragmenter.java | 25 +++---
.../pxf/plugins/hive/HiveMetadataFetcher.java | 63 +++----------
.../hawq/pxf/plugins/hive/HiveResolver.java | 66 +++++++-------
.../plugins/hive/HiveStringPassResolver.java | 13 +--
.../hawq/pxf/plugins/hive/HiveUserData.java | 20 ++++-
.../hive/utilities/EnumHiveToHawqType.java | 31 +++++--
.../plugins/hive/utilities/HiveUtilities.java | 94 ++++++++++++++++++--
.../pxf/plugins/hive/HiveORCAccessorTest.java | 2 +-
.../pxf/service/MetadataResponseFormatter.java | 3 +-
.../apache/hawq/pxf/service/ProfileFactory.java | 9 +-
.../service/MetadataResponseFormatterTest.java | 16 ++--
src/backend/catalog/external/externalmd.c | 65 +++++++++-----
15 files changed, 268 insertions(+), 164 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
index a920e4f..8701813 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
@@ -78,6 +78,7 @@ public class Metadata {
public static class Field {
private String name;
private EnumHawqType type; // field type which PXF exposes
+ private boolean isComplexType; // whether source field's type is complex
private String sourceType; // field type PXF reads from
private String[] modifiers; // type modifiers, optional field
@@ -93,12 +94,17 @@ public class Metadata {
this.sourceType = sourceType;
}
- public Field(String name, EnumHawqType type, String sourceType,
- String[] modifiers) {
+ public Field(String name, EnumHawqType type, String sourceType, String[] modifiers) {
this(name, type, sourceType);
this.modifiers = modifiers;
}
+ public Field(String name, EnumHawqType type, boolean isComplexType, String sourceType, String[] modifiers) {
+ this(name, type, sourceType);
+ this.modifiers = modifiers;
+ this.isComplexType = isComplexType;
+ }
+
public String getName() {
return name;
}
@@ -114,6 +120,14 @@ public class Metadata {
public String[] getModifiers() {
return modifiers;
}
+
+ public boolean isComplexType() {
+ return isComplexType;
+ }
+
+ public void setComplexType(boolean isComplexType) {
+ this.isComplexType = isComplexType;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
index 327a15b..9244ba2 100644
--- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
+++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java
@@ -32,7 +32,7 @@ public class MetadataTest {
@Test
public void createFieldEmptyNameType() {
try {
- Metadata.Field field = new Metadata.Field(null, null, null, null);
+ Metadata.Field field = new Metadata.Field(null, null, false, null, null);
fail("Empty name, type and source type shouldn't be allowed.");
} catch (IllegalArgumentException e) {
assertEquals("Field name, type and source type cannot be empty", e.getMessage());
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
index 157f723..5ef4edc 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -237,9 +237,4 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
}
firstColumn = false;
}
-
- @Override
- void parseDelimiterChar(InputData input) {
- delimiter = 44; //,
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
index 97f278d..6217ce2 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
@@ -162,6 +162,10 @@ public class HiveDataFragmenter extends Fragmenter {
Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
+ Metadata metadata = new Metadata(tblDesc);
+ HiveUtilities.getSchema(tbl, metadata);
+ boolean hasComplexTypes = HiveUtilities.hasComplexTypes(metadata);
+
verifySchema(tbl);
List<Partition> partitions = null;
@@ -227,7 +231,7 @@ public class HiveDataFragmenter extends Fragmenter {
if (partitions.isEmpty()) {
props = getSchema(tbl);
- fetchMetaDataForSimpleTable(descTable, props);
+ fetchMetaDataForSimpleTable(descTable, props, hasComplexTypes);
} else {
List<FieldSchema> partitionKeys = tbl.getPartitionKeys();
@@ -238,7 +242,7 @@ public class HiveDataFragmenter extends Fragmenter {
tblDesc.getPath(), tblDesc.getName(),
partitionKeys);
fetchMetaDataForPartitionedTable(descPartition, props,
- partition, partitionKeys, tblDesc.getName());
+ partition, partitionKeys, tblDesc.getName(), hasComplexTypes);
}
}
}
@@ -254,29 +258,30 @@ public class HiveDataFragmenter extends Fragmenter {
}
private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc,
- Properties props) throws Exception {
- fetchMetaDataForSimpleTable(stdsc, props, null);
+ Properties props, boolean hasComplexTypes) throws Exception {
+ fetchMetaDataForSimpleTable(stdsc, props, null, hasComplexTypes);
}
private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc,
- Properties props, String tableName)
+ Properties props, String tableName, boolean hasComplexTypes)
throws Exception {
fetchMetaData(new HiveTablePartition(stdsc, props, null, null,
- tableName));
+ tableName), hasComplexTypes);
}
private void fetchMetaDataForPartitionedTable(StorageDescriptor stdsc,
Properties props,
Partition partition,
List<FieldSchema> partitionKeys,
- String tableName)
+ String tableName,
+ boolean hasComplexTypes)
throws Exception {
fetchMetaData(new HiveTablePartition(stdsc, props, partition,
- partitionKeys, tableName));
+ partitionKeys, tableName), hasComplexTypes);
}
/* Fills a table partition */
- private void fetchMetaData(HiveTablePartition tablePartition)
+ private void fetchMetaData(HiveTablePartition tablePartition, boolean hasComplexTypes)
throws Exception {
InputFormat<?, ?> fformat = makeInputFormat(
tablePartition.storageDesc.getInputFormat(), jobConf);
@@ -284,7 +289,7 @@ public class HiveDataFragmenter extends Fragmenter {
if (inputData.getProfile() != null) {
// evaluate optimal profile based on file format if profile was explicitly specified in url
// if user passed accessor+fragmenter+resolver - use them
- profile = ProfileFactory.get(fformat);
+ profile = ProfileFactory.get(fformat, hasComplexTypes);
}
String fragmenterForProfile = null;
if (profile != null) {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
index 90943fc..6dcd329 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
@@ -101,32 +101,28 @@ public class HiveMetadataFetcher extends MetadataFetcher {
try {
Metadata metadata = new Metadata(tblDesc);
Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
- getSchema(tbl, metadata);
+ HiveUtilities.getSchema(tbl, metadata);
+ boolean hasComplexTypes = HiveUtilities.hasComplexTypes(metadata);
metadataList.add(metadata);
List<Partition> tablePartitions = client.listPartitionsByFilter(tblDesc.getPath(), tblDesc.getName(), "", (short) -1);
Set<OutputFormat> formats = new HashSet<OutputFormat>();
//If table has partitions - find out all formats
for (Partition tablePartition : tablePartitions) {
String inputFormat = tablePartition.getSd().getInputFormat();
- OutputFormat outputFormat = getOutputFormat(inputFormat);
+ OutputFormat outputFormat = getOutputFormat(inputFormat, hasComplexTypes);
formats.add(outputFormat);
}
//If table has no partitions - get single format of table
if (tablePartitions.size() == 0 ) {
String inputFormat = tbl.getSd().getInputFormat();
- OutputFormat outputFormat = getOutputFormat(inputFormat);
+ OutputFormat outputFormat = getOutputFormat(inputFormat, hasComplexTypes);
formats.add(outputFormat);
}
metadata.setOutputFormats(formats);
- if (tbl != null && tbl.getSd() != null && tbl.getSd().getSerdeInfo() != null) {
- Map<String, String> outputParameters = new HashMap<String, String>();
- Map<String, String> serdeParameters = tbl.getSd().getSerdeInfo().getParameters();
- //outputParameters.put(DELIM_COLLECTION, String.valueOf((int) serdeParameters.get(serdeConstants.COLLECTION_DELIM).charAt(0)));
- //outputParameters.put(DELIM_MAPKEY, String.valueOf((int) serdeParameters.get(serdeConstants.MAPKEY_DELIM).charAt(0)));
- //outputParameters.put(DELIM_LINE, String.valueOf((int) serdeParameters.get(serdeConstants.LINE_DELIM).charAt(0)));
- outputParameters.put(DELIM_FIELD, String.valueOf((int) serdeParameters.get(serdeConstants.FIELD_DELIM).charAt(0)));
- metadata.setOutputParameters(outputParameters);
- }
+ Map<String, String> outputParameters = new HashMap<String, String>();
+ if (HiveUtilities.getDelimiter(tbl.getSd()) != null)
+ outputParameters.put(DELIM_FIELD, HiveUtilities.getDelimiter(tbl.getSd()));
+ metadata.setOutputParameters(outputParameters);
} catch (UnsupportedTypeException | UnsupportedOperationException e) {
if(ignoreErrors) {
LOG.warn("Metadata fetch for " + tblDesc.toString() + " failed. " + e.getMessage());
@@ -140,50 +136,11 @@ public class HiveMetadataFetcher extends MetadataFetcher {
return metadataList;
}
-
- /**
- * Populates the given metadata object with the given table's fields and partitions,
- * The partition fields are added at the end of the table schema.
- * Throws an exception if the table contains unsupported field types.
- * Supported HCatalog types: TINYINT,
- * SMALLINT, INT, BIGINT, BOOLEAN, FLOAT, DOUBLE, STRING, BINARY, TIMESTAMP,
- * DATE, DECIMAL, VARCHAR, CHAR.
- *
- * @param tbl Hive table
- * @param metadata schema of given table
- */
- private void getSchema(Table tbl, Metadata metadata) {
-
- int hiveColumnsSize = tbl.getSd().getColsSize();
- int hivePartitionsSize = tbl.getPartitionKeysSize();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hive table: " + hiveColumnsSize + " fields, " + hivePartitionsSize + " partitions.");
- }
-
- // check hive fields
- try {
- List<FieldSchema> hiveColumns = tbl.getSd().getCols();
- for (FieldSchema hiveCol : hiveColumns) {
- metadata.addField(HiveUtilities.mapHiveType(hiveCol));
- }
- // check partition fields
- List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
- for (FieldSchema hivePart : hivePartitions) {
- metadata.addField(HiveUtilities.mapHiveType(hivePart));
- }
- } catch (UnsupportedTypeException e) {
- String errorMsg = "Failed to retrieve metadata for table " + metadata.getItem() + ". " +
- e.getMessage();
- throw new UnsupportedTypeException(errorMsg);
- }
- }
-
- private OutputFormat getOutputFormat(String inputFormat) {
+ private OutputFormat getOutputFormat(String inputFormat, boolean hasComplexTypes) {
OutputFormat outputFormat = null;
try {
InputFormat<?, ?> fformat = HiveDataFragmenter.makeInputFormat(inputFormat, jobConf);
- String profile = ProfileFactory.get(fformat);
+ String profile = ProfileFactory.get(fformat, hasComplexTypes);
String outputFormatString = ProfilesConf.getProfilePluginsMap(profile).get("X-GP-OUTPUTFORMAT");
outputFormat = OutputFormat.valueOf(outputFormatString);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
index 16b08d7..3e76187 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
@@ -621,44 +621,46 @@ public class HiveResolver extends Plugin implements ReadResolver {
String userDelim = input.getUserProperty(InputData.DELIMITER_KEY);
if (userDelim == null) {
- throw new IllegalArgumentException(InputData.DELIMITER_KEY + " is a required option");
- }
-
- final int VALID_LENGTH = 1;
- final int VALID_LENGTH_HEX = 4;
-
- if (userDelim.startsWith("\\x")) { // hexadecimal sequence
-
- if (userDelim.length() != VALID_LENGTH_HEX) {
+ /* No DELIMITER in URL, try to get it from fragment's user data*/
+ HiveUserData hiveUserData = null;
+ try {
+ hiveUserData = HiveUtilities.parseHiveUserData(input);
+ } catch (UserDataException ude) {
+ throw new IllegalArgumentException(InputData.DELIMITER_KEY + " is a required option");
+ }
+ if (hiveUserData.getDelimiter() == null) {
+ throw new IllegalArgumentException(InputData.DELIMITER_KEY + " is a required option");
+ }
+ delimiter = (char) Integer.valueOf(hiveUserData.getDelimiter()).intValue();
+ } else {
+ final int VALID_LENGTH = 1;
+ final int VALID_LENGTH_HEX = 4;
+ if (userDelim.startsWith("\\x")) { // hexadecimal sequence
+ if (userDelim.length() != VALID_LENGTH_HEX) {
+ throw new IllegalArgumentException(
+ "Invalid hexdecimal value for delimiter (got"
+ + userDelim + ")");
+ }
+ delimiter = (char) Integer.parseInt(
+ userDelim.substring(2, VALID_LENGTH_HEX), 16);
+ if (!CharUtils.isAscii(delimiter)) {
+ throw new IllegalArgumentException(
+ "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
+ + delimiter + ")");
+ }
+ return;
+ }
+ if (userDelim.length() != VALID_LENGTH) {
throw new IllegalArgumentException(
- "Invalid hexdecimal value for delimiter (got"
+ "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got "
+ userDelim + ")");
}
-
- delimiter = (char) Integer.parseInt(
- userDelim.substring(2, VALID_LENGTH_HEX), 16);
-
- if (!CharUtils.isAscii(delimiter)) {
+ if (!CharUtils.isAscii(userDelim.charAt(0))) {
throw new IllegalArgumentException(
"Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
- + delimiter + ")");
+ + userDelim + ")");
}
-
- return;
+ delimiter = userDelim.charAt(0);
}
-
- if (userDelim.length() != VALID_LENGTH) {
- throw new IllegalArgumentException(
- "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got "
- + userDelim + ")");
- }
-
- if (!CharUtils.isAscii(userDelim.charAt(0))) {
- throw new IllegalArgumentException(
- "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
- + userDelim + ")");
- }
-
- delimiter = userDelim.charAt(0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
index 65bce98..acbff27 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
@@ -87,22 +87,11 @@ public class HiveStringPassResolver extends HiveResolver {
public List<OneField> getFields(OneRow onerow) throws Exception {
if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
String line = (onerow.getData()).toString();
- String replacedLine = replaceComplexSpecCharacters(line);
/* We follow Hive convention. Partition fields are always added at the end of the record */
- return Collections.singletonList(new OneField(VARCHAR.getOID(), replacedLine + parts));
+ return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
} else {
return super.getFields(onerow);
}
}
- private String replaceComplexSpecCharacters(String line) throws UserDataException {
- HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(inputData);
- char collectionDelimChar = (char)Integer.valueOf(hiveUserData.getCollectionDelim()).intValue();
- char mapKeyDelimChar = (char)Integer.valueOf(hiveUserData.getMapKeyDelim()).intValue();
- String replacedLine = line;
- replacedLine = line.replace(Character.toString(collectionDelimChar), collectionDelim);
- replacedLine = replacedLine.replace(Character.toString(mapKeyDelimChar), mapkeyDelim);
- return replacedLine;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
index 07159ca..2437b60 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
@@ -27,15 +27,17 @@ public class HiveUserData {
String propertiesString, String partitionKeys,
boolean filterInFragmenter,
String collectionDelim,
- String mapKeyDelim) {
+ String mapKeyDelim,
+ String delimiter) {
this.inputFormatName = inputFormatName;
this.serdeClassName = serdeClassName;
this.propertiesString = propertiesString;
this.partitionKeys = partitionKeys;
this.filterInFragmenter = filterInFragmenter;
- this.collectionDelim = collectionDelim;
- this.mapKeyDelim = mapKeyDelim;
+ this.collectionDelim = (collectionDelim == null ? "0" : collectionDelim);
+ this.mapKeyDelim = (mapKeyDelim == null ? "0" : mapKeyDelim);
+ this.delimiter = (delimiter == null ? "0" : delimiter);
}
public String getInputFormatName() {
@@ -74,6 +76,14 @@ public class HiveUserData {
this.mapKeyDelim = mapKeyDelim;
}
+ public String getDelimiter() {
+ return delimiter;
+ }
+
+ public void setDelimiter(String delimiter) {
+ this.delimiter = delimiter;
+ }
+
private String inputFormatName;
private String serdeClassName;
private String propertiesString;
@@ -81,6 +91,7 @@ public class HiveUserData {
private boolean filterInFragmenter;
private String collectionDelim;
private String mapKeyDelim;
+ private String delimiter;
@Override
public String toString() {
@@ -90,6 +101,7 @@ public class HiveUserData {
+ partitionKeys + HiveUserData.HIVE_UD_DELIM
+ filterInFragmenter + HiveUserData.HIVE_UD_DELIM
+ collectionDelim + HiveUserData.HIVE_UD_DELIM
- + mapKeyDelim;
+ + mapKeyDelim + HiveUserData.HIVE_UD_DELIM
+ + delimiter;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
index d91e949..ea65a66 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java
@@ -42,37 +42,48 @@ public enum EnumHiveToHawqType {
FloatType("float", EnumHawqType.Float4Type),
DoubleType("double", EnumHawqType.Float8Type),
StringType("string", EnumHawqType.TextType),
- BinaryType("binary", EnumHawqType.ByteaType),
+ BinaryType("binary", EnumHawqType.ByteaType, true),
TimestampType("timestamp", EnumHawqType.TimestampType),
DateType("date", EnumHawqType.DateType),
DecimalType("decimal", EnumHawqType.NumericType, "[(,)]"),
VarcharType("varchar", EnumHawqType.VarcharType, "[(,)]"),
CharType("char", EnumHawqType.BpcharType, "[(,)]"),
- ArrayType("array", EnumHawqType.TextType, "[<,>]"),
- MapType("map", EnumHawqType.TextType, "[<,>]"),
- StructType("struct", EnumHawqType.TextType, "[<,>]"),
- UnionType("uniontype", EnumHawqType.TextType, "[<,>]");
+ ArrayType("array", EnumHawqType.TextType, "[<,>]", true),
+ MapType("map", EnumHawqType.TextType, "[<,>]", true),
+ StructType("struct", EnumHawqType.TextType, "[<,>]", true),
+ UnionType("uniontype", EnumHawqType.TextType, "[<,>]", true);
private String typeName;
private EnumHawqType hawqType;
private String splitExpression;
private byte size;
+ private boolean isComplexType;
EnumHiveToHawqType(String typeName, EnumHawqType hawqType) {
this.typeName = typeName;
this.hawqType = hawqType;
}
-
+
EnumHiveToHawqType(String typeName, EnumHawqType hawqType, byte size) {
this(typeName, hawqType);
this.size = size;
}
+ EnumHiveToHawqType(String typeName, EnumHawqType hawqType, boolean isComplexType) {
+ this(typeName, hawqType);
+ this.isComplexType = isComplexType;
+ }
+
EnumHiveToHawqType(String typeName, EnumHawqType hawqType, String splitExpression) {
this(typeName, hawqType);
this.splitExpression = splitExpression;
}
+ EnumHiveToHawqType(String typeName, EnumHawqType hawqType, String splitExpression, boolean isComplexType) {
+ this(typeName, hawqType, splitExpression);
+ this.isComplexType = isComplexType;
+ }
+
/**
*
* @return name of type
@@ -216,4 +227,12 @@ public enum EnumHiveToHawqType {
return size;
}
+ public boolean isComplexType() {
+ return isComplexType;
+ }
+
+ public void setComplexType(boolean isComplexType) {
+ this.isComplexType = isComplexType;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index f8d12ab..d94bf12 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -35,10 +35,12 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hawq.pxf.api.Fragmenter;
import org.apache.hawq.pxf.api.Metadata;
+import org.apache.hawq.pxf.api.Metadata.Field;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
import org.apache.hawq.pxf.api.UserDataException;
import org.apache.hawq.pxf.api.utilities.EnumHawqType;
@@ -100,7 +102,8 @@ public class HiveUtilities {
static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
- private static final int EXPECTED_NUM_OF_TOKS = 7;
+ private static final int EXPECTED_NUM_OF_TOKS = 8;
+ private static final String DEFAULT_DELIMITER = "44";
/**
* Initializes the HiveMetaStoreClient
@@ -195,7 +198,7 @@ public class HiveUtilities {
} else
hiveTypeName = hiveType;
- return new Metadata.Field(fieldName, hawqType, hiveTypeName, modifiers);
+ return new Metadata.Field(fieldName, hawqType, hiveToHawqType.isComplexType(), hiveTypeName, modifiers);
}
/**
@@ -452,28 +455,29 @@ public class HiveUtilities {
String serdeClassName = partData.storageDesc.getSerdeInfo().getSerializationLib();
String propertiesString = serializeProperties(partData.properties);
String partitionKeys = serializePartitionKeys(partData);
- String collectionDelim = String.valueOf((int) partData.storageDesc.getSerdeInfo().getParameters().get(serdeConstants.COLLECTION_DELIM).charAt(0));
- String mapKeyDelim = String.valueOf((int) partData.storageDesc.getSerdeInfo().getParameters().get(serdeConstants.MAPKEY_DELIM).charAt(0));
+ String collectionDelim = getCollectionDelim(partData.storageDesc);
+ String mapKeyDelim = getMapKeyDelim(partData.storageDesc);
+ String delimiter = getDelimiter(partData.storageDesc);
if (HiveInputFormatFragmenter.class.isAssignableFrom(fragmenterClass)) {
assertFileType(inputFormatName, partData);
}
- hiveUserData = new HiveUserData(inputFormatName, serdeClassName, propertiesString, partitionKeys, filterInFragmenter, collectionDelim, mapKeyDelim);
+ hiveUserData = new HiveUserData(inputFormatName, serdeClassName, propertiesString, partitionKeys, filterInFragmenter, collectionDelim, mapKeyDelim, delimiter);
return hiveUserData.toString().getBytes();
}
public static HiveUserData parseHiveUserData(InputData input, PXF_HIVE_SERDES... supportedSerdes) throws UserDataException{
String userData = new String(input.getFragmentUserData());
- String[] toks = userData.split(HiveUserData.HIVE_UD_DELIM);
+ String[] toks = userData.split(HiveUserData.HIVE_UD_DELIM, EXPECTED_NUM_OF_TOKS);
if (toks.length != (EXPECTED_NUM_OF_TOKS)) {
throw new UserDataException("HiveInputFormatFragmenter expected "
+ EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
}
- HiveUserData hiveUserData = new HiveUserData(toks[0], toks[1], toks[2], toks[3], Boolean.valueOf(toks[4]), toks[5], toks[6]);
+ HiveUserData hiveUserData = new HiveUserData(toks[0], toks[1], toks[2], toks[3], Boolean.valueOf(toks[4]), toks[5], toks[6], toks[7]);
if (supportedSerdes.length > 0) {
/* Make sure this serde is supported */
@@ -482,4 +486,80 @@ public class HiveUtilities {
return hiveUserData;
}
+
+ private static String getSerdeParameter(StorageDescriptor sd, String parameterKey) {
+ String parameterValue = null;
+ if (sd != null && sd.getSerdeInfo() != null && sd.getSerdeInfo().getParameters() != null && sd.getSerdeInfo().getParameters().get(parameterKey) != null) {
+ parameterValue = String.valueOf((int) sd.getSerdeInfo().getParameters().get(parameterKey).charAt(0));
+ }
+ return parameterValue;
+ }
+
+ public static String getCollectionDelim(StorageDescriptor sd) {
+ String collectionDelim = getSerdeParameter(sd, serdeConstants.COLLECTION_DELIM);
+ return collectionDelim;
+ }
+
+ public static String getMapKeyDelim(StorageDescriptor sd) {
+ String mapKeyDelim = getSerdeParameter(sd, serdeConstants.MAPKEY_DELIM);
+ return mapKeyDelim;
+ }
+
+ public static String getDelimiter(StorageDescriptor sd) {
+ String delimiter = getSerdeParameter(sd, serdeConstants.FIELD_DELIM);
+ if (delimiter == null)
+ delimiter = DEFAULT_DELIMITER;
+ return delimiter;
+ }
+
+ public static boolean hasComplexTypes(Metadata metadata) {
+ boolean hasComplexTypes = false;
+ List<Field> fields = metadata.getFields();
+ for (Field field: fields) {
+ if (field.isComplexType()) {
+ hasComplexTypes = true;
+ break;
+ }
+ }
+
+ return hasComplexTypes;
+ }
+
+ /**
+ * Populates the given metadata object with the given table's fields and partitions,
+ * The partition fields are added at the end of the table schema.
+ * Throws an exception if the table contains unsupported field types.
+ * Supported HCatalog types: TINYINT,
+ * SMALLINT, INT, BIGINT, BOOLEAN, FLOAT, DOUBLE, STRING, BINARY, TIMESTAMP,
+ * DATE, DECIMAL, VARCHAR, CHAR.
+ *
+ * @param tbl Hive table
+ * @param metadata schema of given table
+ */
+ public static void getSchema(Table tbl, Metadata metadata) {
+
+ int hiveColumnsSize = tbl.getSd().getColsSize();
+ int hivePartitionsSize = tbl.getPartitionKeysSize();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hive table: " + hiveColumnsSize + " fields, " + hivePartitionsSize + " partitions.");
+ }
+
+ // check hive fields
+ try {
+ List<FieldSchema> hiveColumns = tbl.getSd().getCols();
+ for (FieldSchema hiveCol : hiveColumns) {
+ metadata.addField(HiveUtilities.mapHiveType(hiveCol));
+ }
+ // check partition fields
+ List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
+ for (FieldSchema hivePart : hivePartitions) {
+ metadata.addField(HiveUtilities.mapHiveType(hivePart));
+ }
+ } catch (UnsupportedTypeException e) {
+ String errorMsg = "Failed to retrieve metadata for table " + metadata.getItem() + ". " +
+ e.getMessage();
+ throw new UnsupportedTypeException(errorMsg);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
index 30233a4..5feec4f 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
@@ -64,7 +64,7 @@ public class HiveORCAccessorTest {
PowerMockito.whenNew(JobConf.class).withAnyArguments().thenReturn(jobConf);
PowerMockito.mockStatic(HiveUtilities.class);
- PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true, "2", "3"));
+ PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true, "2", "3", "1"));
PowerMockito.mockStatic(HdfsUtilities.class);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
index 8225ec5..d2b0b5c 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
@@ -86,7 +86,8 @@ public class MetadataResponseFormatter {
result.append("Field #").append(++i).append(": [")
.append("Name: ").append(field.getName())
.append(", Type: ").append(field.getType().getTypeName())
- .append(", Source type: ").append(field.getSourceType()).append("] ");
+ .append(", Source type: ").append(field.getSourceType())
+ .append(", Source type is complex: ").append(field.isComplexType()).append("] ");
}
}
LOG.debug(result);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
index 092f89e..580cd39 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
@@ -32,9 +32,14 @@ public class ProfileFactory {
private static final String HIVE_ORC_PROFILE = "HiveORC";
private static final String HIVE_PROFILE = "Hive";
- public static String get(InputFormat inputFormat) {
+/* public static String get(InputFormat inputFormat) {
+ String profileName = get(inputFormat, false);
+ return profileName;
+ }
+*/
+ public static String get(InputFormat inputFormat, boolean hasComplexTypes) {
String profileName = null;
- if (inputFormat instanceof TextInputFormat) {
+ if (inputFormat instanceof TextInputFormat && !hasComplexTypes) {
profileName = HIVE_TEXT_PROFILE;
} else if (inputFormat instanceof RCFileInputFormat) {
profileName = HIVE_RC_PROFILE;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/MetadataResponseFormatterTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/MetadataResponseFormatterTest.java b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/MetadataResponseFormatterTest.java
index 21bf423..546b42d 100644
--- a/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/MetadataResponseFormatterTest.java
+++ b/pxf/pxf-service/src/test/java/org/apache/hawq/pxf/service/MetadataResponseFormatterTest.java
@@ -57,7 +57,7 @@ public class MetadataResponseFormatterTest {
response = MetadataResponseFormatter.formatResponse(metadataList, "path.file");
StringBuilder expected = new StringBuilder("{\"PXFMetadata\":[{");
expected.append("\"item\":{\"path\":\"default\",\"name\":\"table1\"},")
- .append("\"fields\":[{\"name\":\"field1\",\"type\":\"int8\",\"sourceType\":\"bigint\"},{\"name\":\"field2\",\"type\":\"text\",\"sourceType\":\"string\"}]}]}");
+ .append("\"fields\":[{\"name\":\"field1\",\"type\":\"int8\",\"sourceType\":\"bigint\",\"complexType\":false},{\"name\":\"field2\",\"type\":\"text\",\"sourceType\":\"string\",\"complexType\":false}]}]}");
assertEquals(expected.toString(), convertResponseToString(response));
}
@@ -75,7 +75,7 @@ public class MetadataResponseFormatterTest {
response = MetadataResponseFormatter.formatResponse(metadataList, "path.file");
StringBuilder expected = new StringBuilder("{\"PXFMetadata\":[{");
expected.append("\"item\":{\"path\":\"default\",\"name\":\"table1\"},")
- .append("\"fields\":[{\"name\":\"field1\",\"type\":\"int8\",\"sourceType\":\"bigint\"},{\"name\":\"field2\",\"type\":\"text\",\"sourceType\":\"string\"}]}]}");
+ .append("\"fields\":[{\"name\":\"field1\",\"type\":\"int8\",\"sourceType\":\"bigint\",\"complexType\":false},{\"name\":\"field2\",\"type\":\"text\",\"sourceType\":\"string\",\"complexType\":false}]}]}");
assertEquals(expected.toString(), convertResponseToString(response));
}
@@ -97,9 +97,9 @@ public class MetadataResponseFormatterTest {
StringBuilder expected = new StringBuilder("{\"PXFMetadata\":[{");
expected.append("\"item\":{\"path\":\"default\",\"name\":\"table1\"},")
.append("\"fields\":[")
- .append("{\"name\":\"field1\",\"type\":\"int8\",\"sourceType\":\"bigint\"},")
- .append("{\"name\":\"field2\",\"type\":\"numeric\",\"sourceType\":\"decimal\",\"modifiers\":[\"1349\",\"1789\"]},")
- .append("{\"name\":\"field3\",\"type\":\"bpchar\",\"sourceType\":\"char\",\"modifiers\":[\"50\"]}")
+ .append("{\"name\":\"field1\",\"type\":\"int8\",\"sourceType\":\"bigint\",\"complexType\":false},")
+ .append("{\"name\":\"field2\",\"type\":\"numeric\",\"sourceType\":\"decimal\",\"modifiers\":[\"1349\",\"1789\"],\"complexType\":false},")
+ .append("{\"name\":\"field3\",\"type\":\"bpchar\",\"sourceType\":\"char\",\"modifiers\":[\"50\"],\"complexType\":false}")
.append("]}]}");
assertEquals(expected.toString(), convertResponseToString(response));
@@ -118,7 +118,7 @@ public class MetadataResponseFormatterTest {
StringBuilder expected = new StringBuilder("{\"PXFMetadata\":[{");
expected.append("\"item\":{\"path\":\"default\",\"name\":\"table1\"},")
.append("\"fields\":[")
- .append("{\"name\":\"field1\",\"type\":\"float8\",\"sourceType\":\"double\"}")
+ .append("{\"name\":\"field1\",\"type\":\"float8\",\"sourceType\":\"double\",\"complexType\":false}")
.append("]}]}");
assertEquals(expected.toString(), convertResponseToString(response));
@@ -199,7 +199,7 @@ public class MetadataResponseFormatterTest {
expected.append(",");
}
expected.append("{\"item\":{\"path\":\"default\",\"name\":\"table").append(i).append("\"},");
- expected.append("\"fields\":[{\"name\":\"field1\",\"type\":\"int8\",\"sourceType\":\"bigint\"},{\"name\":\"field2\",\"type\":\"text\",\"sourceType\":\"string\"}]}");
+ expected.append("\"fields\":[{\"name\":\"field1\",\"type\":\"int8\",\"sourceType\":\"bigint\",\"complexType\":false},{\"name\":\"field2\",\"type\":\"text\",\"sourceType\":\"string\",\"complexType\":false}]}");
}
expected.append("]}");
@@ -226,7 +226,7 @@ public class MetadataResponseFormatterTest {
expected.append(",");
}
expected.append("{\"item\":{\"path\":\"default").append(i).append("\",\"name\":\"table").append(i).append("\"},");
- expected.append("\"fields\":[{\"name\":\"field1\",\"type\":\"int8\",\"sourceType\":\"bigint\"},{\"name\":\"field2\",\"type\":\"text\",\"sourceType\":\"string\"}]}");
+ expected.append("\"fields\":[{\"name\":\"field1\",\"type\":\"int8\",\"sourceType\":\"bigint\",\"complexType\":false},{\"name\":\"field2\",\"type\":\"text\",\"sourceType\":\"string\",\"complexType\":false}]}");
}
expected.append("]}");
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/607184c3/src/backend/catalog/external/externalmd.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/external/externalmd.c b/src/backend/catalog/external/externalmd.c
index 4150f42..0e572ae 100644
--- a/src/backend/catalog/external/externalmd.c
+++ b/src/backend/catalog/external/externalmd.c
@@ -59,6 +59,7 @@ static void LoadColumns(Oid relid, List *columns);
static int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nTypeMod);
static Datum GetFormatTypeForProfile(const List *outputFormats);
static Datum GetFormatOptionsForProfile(const List *outputFormats, int delimiter);
+static Datum GetLocationForFormat(char *profile, List *outputFormats, char *pxf_service_address, char *path, char *name, int delimiter);
const int maxNumTypeModifiers = 2;
/*
@@ -469,27 +470,8 @@ static void LoadExtTable(Oid relid, PxfItem *pxfItem)
values[i] = (Datum) 0;
}
- /* location - should be an array of text with one element:
- * pxf://<ip:port/namaservice>/<hive db>.<hive table>?Profile=Hive */
- StringInfoData locationStr;
- initStringInfo(&locationStr);
- appendStringInfo(&locationStr, "pxf://%s/%s.%s?Profile=%s&delimiter=%cx%02x",
- pxf_service_address, pxfItem->path, pxfItem->name, pxfItem->profile, '\\', pxfItem->delimiter);
- Size len = VARHDRSZ + locationStr.len;
- /* +1 leaves room for sprintf's trailing null */
- text *t = (text *) palloc(len + 1);
- SET_VARSIZE(t, len);
- sprintf((char *) VARDATA(t), "%s", locationStr.data);
- ArrayBuildState *astate = NULL;
- astate = accumArrayResult(astate, PointerGetDatum(t),
- false, TEXTOID,
- CurrentMemoryContext);
- pfree(locationStr.data);
- Assert(NULL != astate);
- Datum location = makeArrayResult(astate, CurrentMemoryContext);
-
values[Anum_pg_exttable_reloid - 1] = ObjectIdGetDatum(relid);
- values[Anum_pg_exttable_location - 1] = location;
+ values[Anum_pg_exttable_location - 1] = GetLocationForFormat(pxfItem->profile, pxfItem->outputFormats, pxf_service_address, pxfItem->path, pxfItem->name, pxfItem->delimiter);
values[Anum_pg_exttable_fmttype - 1] = GetFormatTypeForProfile(pxfItem->outputFormats);
values[Anum_pg_exttable_fmtopts - 1] = GetFormatOptionsForProfile(pxfItem->outputFormats, pxfItem->delimiter);
nulls[Anum_pg_exttable_command - 1] = true;
@@ -664,11 +646,13 @@ static Datum GetFormatOptionsForProfile(const List *outputFormats, int delimiter
{
StringInfoData formatStr;
initStringInfo(&formatStr);
+
/* "delimiter 'delimiter' null '\\N' escape '\\'"*/
char formatArr[35] = { 0x64, 0x65, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x65,
0x72, 0x20, 0x27, delimiter, 0x27, 0x20, 0x6e, 0x75, 0x6c, 0x6c,
0x20, 0x27, 0x5c, 0x4e, 0x27, 0x20, 0x65, 0x73, 0x63, 0x61, 0x70,
0x65, 0x20, 0x27, 0x5c, 0x27, 0x00 };
+
if (list_length(outputFormats) == 1 && strcmp(lfirst(list_head(outputFormats)),"TEXT") == 0)
{
appendStringInfo(&formatStr, "%s", formatArr);
@@ -680,3 +664,44 @@ static Datum GetFormatOptionsForProfile(const List *outputFormats, int delimiter
return format_opts;
}
+/* location - should be an array of text with one element:
+ * pxf://<ip:port/namaservice>/<hive db>.<hive table>?Profile=Hive */
+static Datum GetLocationForFormat(char *profile, List *outputFormats, char *pxf_service_address, char *path, char *name, int delimiter)
+{
+ StringInfoData locationStr;
+ initStringInfo(&locationStr);
+ appendStringInfo(&locationStr, "pxf://%s/%s.%s?Profile=%s", pxf_service_address, path, name, profile);
+ bool hasTextOutputFormat = false;
+ ListCell *lc = NULL;
+ foreach (lc, outputFormats)
+ {
+ char *outputFormat = (char *) lfirst(lc);
+ if (strcmp(outputFormat, "TEXT") == 0)
+ {
+ hasTextOutputFormat = true;
+ break;
+ }
+ }
+ if (delimiter)
+ {
+ appendStringInfo(&locationStr, "&delimiter=%cx%02x", '\\', delimiter);
+ } else if (hasTextOutputFormat)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("delimiter attribute is mandatory for output format \"TEXT\"")));
+ }
+ Size len = VARHDRSZ + locationStr.len;
+ /* +1 leaves room for sprintf's trailing null */
+ text *t = (text *) palloc(len + 1);
+ SET_VARSIZE(t, len);
+ sprintf((char *) VARDATA(t), "%s", locationStr.data);
+ ArrayBuildState *astate = NULL;
+ astate = accumArrayResult(astate, PointerGetDatum(t),
+ false, TEXTOID,
+ CurrentMemoryContext);
+ pfree(locationStr.data);
+ Assert(NULL != astate);
+ Datum location = makeArrayResult(astate, CurrentMemoryContext);
+ return location;
+}