You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2015/10/28 23:09:56 UTC
[06/15] incubator-hawq git commit: HAWQ-45. PXF Package Namespace
change
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
new file mode 100644
index 0000000..5de36b2
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
@@ -0,0 +1,262 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Specialized Hive fragmenter for RC and Text files tables. Unlike the
+ * {@link HiveDataFragmenter}, this class does not send the serde properties to
+ * the accessor/resolvers. This is done to avoid memory explosion in Hawq. For
+ * RC use together with {@link HiveRCFileAccessor}/
+ * {@link HiveColumnarSerdeResolver}. For Text use together with
+ * {@link HiveLineBreakAccessor}/{@link HiveStringPassResolver}. <br>
+ * Given a Hive table and its partitions, divide the data into fragments (here a
+ * data fragment is actually a HDFS file block) and return a list of them. Each
+ * data fragment will contain the following information:
+ * <ol>
+ * <li>sourceName: full HDFS path to the data file that this data fragment is
+ * part of</li>
+ * <li>hosts: a list of the datanode machines that hold a replica of this block</li>
+ * <li>userData: inputformat name, serde names and partition keys</li>
+ * </ol>
+ */
+public class HiveInputFormatFragmenter extends HiveDataFragmenter {
+ private static final Log LOG = LogFactory.getLog(HiveInputFormatFragmenter.class);
+
+ static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+ static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
+ static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
+ static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe";
+ static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+ private static final int EXPECTED_NUM_OF_TOKS = 3;
+ public static final int TOK_SERDE = 0;
+ public static final int TOK_KEYS = 1;
+ public static final int TOK_FILTER_DONE = 2;
+
+ /** Defines the Hive input formats currently supported in pxf */
+ public enum PXF_HIVE_INPUT_FORMATS {
+ RC_FILE_INPUT_FORMAT,
+ TEXT_FILE_INPUT_FORMAT
+ }
+
+ /** Defines the Hive serializers (serde classes) currently supported in pxf */
+ public enum PXF_HIVE_SERDES {
+ COLUMNAR_SERDE,
+ LAZY_BINARY_COLUMNAR_SERDE,
+ LAZY_SIMPLE_SERDE
+ }
+
+ /**
+ * Constructs a HiveInputFormatFragmenter.
+ *
+ * @param inputData all input parameters coming from the client
+ */
+ public HiveInputFormatFragmenter(InputData inputData) {
+ super(inputData, HiveInputFormatFragmenter.class);
+ }
+
+ /**
+ * Extracts the user data:
+ * serde, partition keys and whether filter was included in fragmenter
+ *
+ * @param input input data from client
+ * @param supportedSerdes supported serde names
+ * @return parsed tokens
+ * @throws UserDataException if user data contains unsupported serde
+ * or wrong number of tokens
+ */
+ static public String[] parseToks(InputData input, String... supportedSerdes)
+ throws UserDataException {
+ String userData = new String(input.getFragmentUserData());
+ String[] toks = userData.split(HIVE_UD_DELIM);
+ if (supportedSerdes.length > 0
+ && !Arrays.asList(supportedSerdes).contains(toks[TOK_SERDE])) {
+ throw new UserDataException(toks[TOK_SERDE]
+ + " serializer isn't supported by " + input.getAccessor());
+ }
+
+ if (toks.length != (EXPECTED_NUM_OF_TOKS)) {
+ throw new UserDataException("HiveInputFormatFragmenter expected "
+ + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
+ }
+
+ return toks;
+ }
+
+ /*
+ * Checks that hive fields and partitions match the HAWQ schema. Throws an
+ * exception if: - the number of fields (+ partitions) do not match the HAWQ
+ * table definition. - the hive fields types do not match the HAWQ fields.
+ */
+ @Override
+ void verifySchema(Table tbl) throws Exception {
+
+ int columnsSize = inputData.getColumns();
+ int hiveColumnsSize = tbl.getSd().getColsSize();
+ int hivePartitionsSize = tbl.getPartitionKeysSize();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hive table: " + hiveColumnsSize + " fields, "
+ + hivePartitionsSize + " partitions. " + "HAWQ table: "
+ + columnsSize + " fields.");
+ }
+
+ // check schema size
+ if (columnsSize != (hiveColumnsSize + hivePartitionsSize)) {
+ throw new IllegalArgumentException("Hive table schema ("
+ + hiveColumnsSize + " fields, " + hivePartitionsSize
+ + " partitions) " + "doesn't match PXF table ("
+ + columnsSize + " fields)");
+ }
+
+ int index = 0;
+ // check hive fields
+ List<FieldSchema> hiveColumns = tbl.getSd().getCols();
+ for (FieldSchema hiveCol : hiveColumns) {
+ ColumnDescriptor colDesc = inputData.getColumn(index++);
+ DataType colType = DataType.get(colDesc.columnTypeCode());
+ compareTypes(colType, hiveCol.getType(), colDesc.columnName());
+ }
+ // check partition fields
+ List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
+ for (FieldSchema hivePart : hivePartitions) {
+ ColumnDescriptor colDesc = inputData.getColumn(index++);
+ DataType colType = DataType.get(colDesc.columnTypeCode());
+ compareTypes(colType, hivePart.getType(), colDesc.columnName());
+ }
+
+ }
+
+ private void compareTypes(DataType type, String hiveType, String fieldName) {
+ String convertedHive = toHiveType(type, fieldName);
+ if (!convertedHive.equals(hiveType)
+ && !(convertedHive.equals("smallint") && hiveType.equals("tinyint"))) {
+ throw new UnsupportedTypeException(
+ "Schema mismatch definition: Field " + fieldName
+ + " (Hive type " + hiveType + ", HAWQ type "
+ + type.toString() + ")");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Field " + fieldName + ": Hive type " + hiveType
+ + ", HAWQ type " + type.toString());
+ }
+ }
+
+ /**
+ * Converts HAWQ type to hive type. The supported mappings are:<ul>
+ * <li>{@code BOOLEAN -> boolean}</li>
+ * <li>{@code SMALLINT -> smallint (tinyint is converted to smallint)}</li>
+ * <li>{@code BIGINT -> bigint}</li>
+ * <li>{@code TIMESTAMP, TIME -> timestamp}</li>
+ * <li>{@code NUMERIC -> decimal}</li>
+ * <li>{@code BYTEA -> binary}</li>
+ * <li>{@code INTERGER -> int}</li>
+ * <li>{@code TEXT -> string}</li>
+ * <li>{@code REAL -> float}</li>
+ * <li>{@code FLOAT8 -> double}</li>
+ * </ul>
+ * All other types (both in HAWQ and in HIVE) are not supported.
+ *
+ * @param type HAWQ data type
+ * @param name field name
+ * @return Hive type
+ * @throws UnsupportedTypeException if type is not supported
+ */
+ public static String toHiveType(DataType type, String name) {
+ switch (type) {
+ case BOOLEAN:
+ case SMALLINT:
+ case BIGINT:
+ case TIMESTAMP:
+ return type.toString().toLowerCase();
+ case NUMERIC:
+ return "decimal";
+ case BYTEA:
+ return "binary";
+ case INTEGER:
+ return "int";
+ case TEXT:
+ return "string";
+ case REAL:
+ return "float";
+ case FLOAT8:
+ return "double";
+ case TIME:
+ return "timestamp";
+ default:
+ throw new UnsupportedTypeException(
+ type.toString()
+ + " conversion is not supported by HiveInputFormatFragmenter (Field "
+ + name + ")");
+ }
+ }
+
+ /*
+ * Validates that partition format corresponds to PXF supported formats and
+ * transforms the class name to an enumeration for writing it to the
+ * accessors on other PXF instances.
+ */
+ private String assertFileType(String className, HiveTablePartition partData)
+ throws Exception {
+ switch (className) {
+ case STR_RC_FILE_INPUT_FORMAT:
+ return PXF_HIVE_INPUT_FORMATS.RC_FILE_INPUT_FORMAT.name();
+ case STR_TEXT_FILE_INPUT_FORMAT:
+ return PXF_HIVE_INPUT_FORMATS.TEXT_FILE_INPUT_FORMAT.name();
+ default:
+ throw new IllegalArgumentException(
+ "HiveInputFormatFragmenter does not yet support "
+ + className
+ + " for "
+ + partData
+ + ". Supported InputFormat are "
+ + Arrays.toString(PXF_HIVE_INPUT_FORMATS.values()));
+ }
+ }
+
+ /*
+ * Validates that partition serde corresponds to PXF supported serdes and
+ * transforms the class name to an enumeration for writing it to the
+ * resolvers on other PXF instances.
+ */
+ private String assertSerde(String className, HiveTablePartition partData)
+ throws Exception {
+ switch (className) {
+ case STR_COLUMNAR_SERDE:
+ return PXF_HIVE_SERDES.COLUMNAR_SERDE.name();
+ case STR_LAZY_BINARY_COLUMNAR_SERDE:
+ return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name();
+ case STR_LAZY_SIMPLE_SERDE:
+ return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name();
+ default:
+ throw new UnsupportedTypeException(
+ "HiveInputFormatFragmenter does not yet support "
+ + className + " for " + partData
+ + ". Supported serializers are: "
+ + Arrays.toString(PXF_HIVE_SERDES.values()));
+ }
+ }
+
+ @Override
+ byte[] makeUserData(HiveTablePartition partData) throws Exception {
+ String inputFormatName = partData.storageDesc.getInputFormat();
+ String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
+ String partitionKeys = serializePartitionKeys(partData);
+
+ assertFileType(inputFormatName, partData);
+ String userData = assertSerde(serdeName, partData) + HIVE_UD_DELIM
+ + partitionKeys + HIVE_UD_DELIM + filterInFragmenter;
+
+ return userData.getBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
new file mode 100644
index 0000000..2ee215f
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
@@ -0,0 +1,35 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+
+import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+
+/**
+ * Specialization of HiveAccessor for a Hive table stored as Text files.
+ * Use together with {@link HiveInputFormatFragmenter}/{@link HiveStringPassResolver}.
+ */
+public class HiveLineBreakAccessor extends HiveAccessor {
+
+ /**
+ * Constructs a HiveLineBreakAccessor.
+ *
+ * @param input input containing user data
+ * @throws Exception if user data was wrong
+ */
+ public HiveLineBreakAccessor(InputData input) throws Exception {
+ super(input, new TextInputFormat());
+ ((TextInputFormat) inputFormat).configure(jobConf);
+ String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name());
+ initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
+ filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+ }
+
+ @Override
+ protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
+ return new LineRecordReader(jobConf, (FileSplit) split);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
new file mode 100644
index 0000000..128fc37
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
@@ -0,0 +1,79 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import org.apache.hawq.pxf.api.Metadata;
+import org.apache.hawq.pxf.api.MetadataFetcher;
+import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+
+/**
+ * Class for connecting to Hive's MetaStore and getting schema of Hive tables.
+ */
+public class HiveMetadataFetcher extends MetadataFetcher {
+
+ private static final Log LOG = LogFactory.getLog(HiveMetadataFetcher.class);
+ private HiveMetaStoreClient client;
+
+ public HiveMetadataFetcher() {
+ super();
+
+ // init hive metastore client connection.
+ client = HiveUtilities.initHiveClient();
+ }
+
+ @Override
+ public Metadata getTableMetadata(String tableName) throws Exception {
+
+ Metadata.Table tblDesc = HiveUtilities.parseTableQualifiedName(tableName);
+ Metadata metadata = new Metadata(tblDesc);
+
+ Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
+
+ getSchema(tbl, metadata);
+
+ return metadata;
+ }
+
+
+ /**
+ * Populates the given metadata object with the given table's fields and partitions,
+ * The partition fields are added at the end of the table schema.
+ * Throws an exception if the table contains unsupported field types.
+ *
+ * @param tbl Hive table
+ * @param metadata schema of given table
+ */
+ private void getSchema(Table tbl, Metadata metadata) {
+
+ int hiveColumnsSize = tbl.getSd().getColsSize();
+ int hivePartitionsSize = tbl.getPartitionKeysSize();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hive table: " + hiveColumnsSize + " fields, " + hivePartitionsSize + " partitions.");
+ }
+
+ // check hive fields
+ try {
+ List<FieldSchema> hiveColumns = tbl.getSd().getCols();
+ for (FieldSchema hiveCol : hiveColumns) {
+ metadata.addField(HiveUtilities.mapHiveType(hiveCol));
+ }
+ // check partition fields
+ List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
+ for (FieldSchema hivePart : hivePartitions) {
+ metadata.addField(HiveUtilities.mapHiveType(hivePart));
+ }
+ } catch (UnsupportedTypeException e) {
+ String errorMsg = "Failed to retrieve metadata for table " + metadata.getTable() + ". " +
+ e.getMessage();
+ throw new UnsupportedTypeException(errorMsg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
new file mode 100644
index 0000000..0a8e5be
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
@@ -0,0 +1,39 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+
+import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+
+/**
+ * Specialization of HiveAccessor for a Hive table that stores only RC files.
+ * This class replaces the generic HiveAccessor for a case where a table is stored entirely as RC files.
+ * Use together with {@link HiveInputFormatFragmenter}/{@link HiveColumnarSerdeResolver}
+ */
+public class HiveRCFileAccessor extends HiveAccessor {
+
+ /**
+ * Constructs a HiveRCFileAccessor.
+ *
+ * @param input input containing user data
+ * @throws Exception if user data was wrong
+ */
+ public HiveRCFileAccessor(InputData input) throws Exception {
+ super(input, new RCFileInputFormat());
+ String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name());
+ initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
+ filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+ }
+
+ @Override
+ protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
+ return new RCFileRecordReader(jobConf, (FileSplit) split);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
new file mode 100644
index 0000000..ce644e1
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
@@ -0,0 +1,608 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import org.apache.hawq.pxf.api.*;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.service.utilities.Utilities;
+
+import org.apache.commons.lang.CharUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.*;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hawq.pxf.api.io.DataType.*;
+
+/**
+ * Class HiveResolver handles deserialization of records that were serialized
+ * using Hadoop's Hive serialization framework.
+ */
+/*
+ * TODO - remove SupressWarning once Hive resolves the problem described below
+ * This line and the change of the deserialiazer member to Object instead of the
+ * original Deserializer...., All this changes stem from the same issue. In
+ * 0.11.0 The API changed and all Serde types extend a new interface -
+ * AbstractSerde. But this change was not adopted by the OrcSerde (which was
+ * also introduced in Hive 0.11.0). In order to cope with this inconsistency...
+ * this bit of juggling has been necessary.
+ */
+@SuppressWarnings("deprecation")
+public class HiveResolver extends Plugin implements ReadResolver {
+ private static final Log LOG = LogFactory.getLog(HiveResolver.class);
+ private static final String MAPKEY_DELIM = ":";
+ private static final String COLLECTION_DELIM = ",";
+ private SerDe deserializer;
+ private List<OneField> partitionFields;
+ private String serdeName;
+ private String propsString;
+ private String collectionDelim;
+ private String mapkeyDelim;
+ String partitionKeys;
+ char delimiter;
+ String nullChar = "\\N";
+ private Configuration conf;
+ private String hiveDefaultPartName;
+
+ /**
+ * Constructs the HiveResolver by parsing the userdata in the input and
+ * obtaining the serde class name, the serde properties string and the
+ * partition keys.
+ *
+ * @param input contains the Serde class name, the serde properties string
+ * and the partition keys
+ * @throws Exception if user data was wrong or serde failed to be
+ * instantiated
+ */
+ public HiveResolver(InputData input) throws Exception {
+ super(input);
+
+ conf = new Configuration();
+ hiveDefaultPartName = HiveConf.getVar(conf,
+ HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+ LOG.debug("Hive's default partition name is " + hiveDefaultPartName);
+
+ parseUserData(input);
+ initPartitionFields();
+ initSerde(input);
+ }
+
+ @Override
+ public List<OneField> getFields(OneRow onerow) throws Exception {
+ Object tuple = deserializer.deserialize((Writable) onerow.getData());
+ // Each Hive record is a Struct
+ StructObjectInspector soi = (StructObjectInspector) deserializer.getObjectInspector();
+ List<OneField> record = traverseStruct(tuple, soi, false);
+ /*
+ * We follow Hive convention. Partition fields are always added at the
+ * end of the record
+ */
+ record.addAll(partitionFields);
+
+ return record;
+ }
+
+ /* Parses user data string (arrived from fragmenter). */
+ void parseUserData(InputData input) throws Exception {
+ final int EXPECTED_NUM_OF_TOKS = 5;
+
+ String userData = new String(input.getFragmentUserData());
+ String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
+
+ if (toks.length != EXPECTED_NUM_OF_TOKS) {
+ throw new UserDataException("HiveResolver expected "
+ + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
+ }
+
+ serdeName = toks[1];
+ propsString = toks[2];
+ partitionKeys = toks[3];
+
+ collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
+ : input.getUserProperty("COLLECTION_DELIM");
+ mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
+ : input.getUserProperty("MAPKEY_DELIM");
+ }
+
+ /* Gets and init the deserializer for the records of this Hive data fragment. */
+ void initSerde(InputData inputData) throws Exception {
+ Properties serdeProperties;
+
+ Class<?> c = Class.forName(serdeName, true, JavaUtils.getClassLoader());
+ deserializer = (SerDe) c.newInstance();
+ serdeProperties = new Properties();
+ ByteArrayInputStream inStream = new ByteArrayInputStream(
+ propsString.getBytes());
+ serdeProperties.load(inStream);
+ deserializer.initialize(new JobConf(conf, HiveResolver.class),
+ serdeProperties);
+ }
+
+ /*
+ * The partition fields are initialized one time base on userData provided
+ * by the fragmenter.
+ */
+ void initPartitionFields() {
+ partitionFields = new LinkedList<>();
+ if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
+ return;
+ }
+
+ String[] partitionLevels = partitionKeys.split(HiveDataFragmenter.HIVE_PARTITIONS_DELIM);
+ for (String partLevel : partitionLevels) {
+ String[] levelKey = partLevel.split(HiveDataFragmenter.HIVE_1_PART_DELIM);
+ String type = levelKey[1];
+ String val = levelKey[2];
+ DataType convertedType;
+ Object convertedValue = null;
+ boolean isDefaultPartition = false;
+
+ LOG.debug("Partition type: " + type + ", value: " + val);
+ // check if value is default partition
+ isDefaultPartition = isDefaultPartition(type, val);
+ // ignore the type's parameters
+ String typeName = type.replaceAll("\\(.*\\)", "");
+
+ switch (typeName) {
+ case serdeConstants.STRING_TYPE_NAME:
+ convertedType = TEXT;
+ convertedValue = isDefaultPartition ? null : val;
+ break;
+ case serdeConstants.BOOLEAN_TYPE_NAME:
+ convertedType = BOOLEAN;
+ convertedValue = isDefaultPartition ? null
+ : Boolean.valueOf(val);
+ break;
+ case serdeConstants.TINYINT_TYPE_NAME:
+ case serdeConstants.SMALLINT_TYPE_NAME:
+ convertedType = SMALLINT;
+ convertedValue = isDefaultPartition ? null
+ : Short.parseShort(val);
+ break;
+ case serdeConstants.INT_TYPE_NAME:
+ convertedType = INTEGER;
+ convertedValue = isDefaultPartition ? null
+ : Integer.parseInt(val);
+ break;
+ case serdeConstants.BIGINT_TYPE_NAME:
+ convertedType = BIGINT;
+ convertedValue = isDefaultPartition ? null
+ : Long.parseLong(val);
+ break;
+ case serdeConstants.FLOAT_TYPE_NAME:
+ convertedType = REAL;
+ convertedValue = isDefaultPartition ? null
+ : Float.parseFloat(val);
+ break;
+ case serdeConstants.DOUBLE_TYPE_NAME:
+ convertedType = FLOAT8;
+ convertedValue = isDefaultPartition ? null
+ : Double.parseDouble(val);
+ break;
+ case serdeConstants.TIMESTAMP_TYPE_NAME:
+ convertedType = TIMESTAMP;
+ convertedValue = isDefaultPartition ? null
+ : Timestamp.valueOf(val);
+ break;
+ case serdeConstants.DATE_TYPE_NAME:
+ convertedType = DATE;
+ convertedValue = isDefaultPartition ? null
+ : Date.valueOf(val);
+ break;
+ case serdeConstants.DECIMAL_TYPE_NAME:
+ convertedType = NUMERIC;
+ convertedValue = isDefaultPartition ? null
+ : HiveDecimal.create(val).bigDecimalValue().toString();
+ break;
+ case serdeConstants.VARCHAR_TYPE_NAME:
+ convertedType = VARCHAR;
+ convertedValue = isDefaultPartition ? null : val;
+ break;
+ case serdeConstants.CHAR_TYPE_NAME:
+ convertedType = BPCHAR;
+ convertedValue = isDefaultPartition ? null : val;
+ break;
+ case serdeConstants.BINARY_TYPE_NAME:
+ convertedType = BYTEA;
+ convertedValue = isDefaultPartition ? null : val.getBytes();
+ break;
+ default:
+ throw new UnsupportedTypeException(
+ "Unsupported partition type: " + type);
+ }
+ addOneFieldToRecord(partitionFields, convertedType, convertedValue);
+ }
+ }
+
+ /*
+ * The partition fields are initialized one time based on userData provided
+ * by the fragmenter.
+ */
+ int initPartitionFields(StringBuilder parts) {
+ if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
+ return 0;
+ }
+ String[] partitionLevels = partitionKeys.split(HiveDataFragmenter.HIVE_PARTITIONS_DELIM);
+ for (String partLevel : partitionLevels) {
+ String[] levelKey = partLevel.split(HiveDataFragmenter.HIVE_1_PART_DELIM);
+ String type = levelKey[1];
+ String val = levelKey[2];
+ parts.append(delimiter);
+
+ if (isDefaultPartition(type, val)) {
+ parts.append(nullChar);
+ } else {
+ // ignore the type's parameters
+ String typeName = type.replaceAll("\\(.*\\)", "");
+ switch (typeName) {
+ case serdeConstants.STRING_TYPE_NAME:
+ case serdeConstants.VARCHAR_TYPE_NAME:
+ case serdeConstants.CHAR_TYPE_NAME:
+ parts.append(val);
+ break;
+ case serdeConstants.BOOLEAN_TYPE_NAME:
+ parts.append(Boolean.parseBoolean(val));
+ break;
+ case serdeConstants.TINYINT_TYPE_NAME:
+ case serdeConstants.SMALLINT_TYPE_NAME:
+ parts.append(Short.parseShort(val));
+ break;
+ case serdeConstants.INT_TYPE_NAME:
+ parts.append(Integer.parseInt(val));
+ break;
+ case serdeConstants.BIGINT_TYPE_NAME:
+ parts.append(Long.parseLong(val));
+ break;
+ case serdeConstants.FLOAT_TYPE_NAME:
+ parts.append(Float.parseFloat(val));
+ break;
+ case serdeConstants.DOUBLE_TYPE_NAME:
+ parts.append(Double.parseDouble(val));
+ break;
+ case serdeConstants.TIMESTAMP_TYPE_NAME:
+ parts.append(Timestamp.valueOf(val));
+ break;
+ case serdeConstants.DATE_TYPE_NAME:
+ parts.append(Date.valueOf(val));
+ break;
+ case serdeConstants.DECIMAL_TYPE_NAME:
+ parts.append(HiveDecimal.create(val).bigDecimalValue());
+ break;
+ case serdeConstants.BINARY_TYPE_NAME:
+ Utilities.byteArrayToOctalString(val.getBytes(), parts);
+ break;
+ default:
+ throw new UnsupportedTypeException(
+ "Unsupported partition type: " + type);
+ }
+ }
+ }
+ return partitionLevels.length;
+ }
+
+ /**
+ * Returns true if the partition value is Hive's default partition name
+ * (defined in hive.exec.default.partition.name).
+ *
+ * @param partitionType partition field type
+ * @param partitionValue partition value
+ * @return true if the partition value is Hive's default partition
+ */
+ private boolean isDefaultPartition(String partitionType,
+ String partitionValue) {
+ boolean isDefaultPartition = false;
+ if (hiveDefaultPartName.equals(partitionValue)) {
+ LOG.debug("partition " + partitionType
+ + " is hive default partition (value " + partitionValue
+ + "), converting field to NULL");
+ isDefaultPartition = true;
+ }
+ return isDefaultPartition;
+ }
+
+ /*
+ * If the object representing the whole record is null or if an object
+ * representing a composite sub-object (map, list,..) is null - then
+ * BadRecordException will be thrown. If a primitive field value is null,
+ * then a null will appear for the field in the record in the query result.
+ */
+ private void traverseTuple(Object obj, ObjectInspector objInspector,
+ List<OneField> record, boolean toFlatten)
+ throws IOException, BadRecordException {
+ ObjectInspector.Category category = objInspector.getCategory();
+ if ((obj == null) && (category != ObjectInspector.Category.PRIMITIVE)) {
+ throw new BadRecordException("NULL Hive composite object");
+ }
+ switch (category) {
+ case PRIMITIVE:
+ resolvePrimitive(obj, (PrimitiveObjectInspector) objInspector,
+ record, toFlatten);
+ break;
+ case LIST:
+ List<OneField> listRecord = traverseList(obj,
+ (ListObjectInspector) objInspector);
+ addOneFieldToRecord(record, TEXT, String.format("[%s]",
+ HdfsUtilities.toString(listRecord, collectionDelim)));
+ break;
+ case MAP:
+ List<OneField> mapRecord = traverseMap(obj,
+ (MapObjectInspector) objInspector);
+ addOneFieldToRecord(record, TEXT, String.format("{%s}",
+ HdfsUtilities.toString(mapRecord, collectionDelim)));
+ break;
+ case STRUCT:
+ List<OneField> structRecord = traverseStruct(obj,
+ (StructObjectInspector) objInspector, true);
+ addOneFieldToRecord(record, TEXT, String.format("{%s}",
+ HdfsUtilities.toString(structRecord, collectionDelim)));
+ break;
+ case UNION:
+ List<OneField> unionRecord = traverseUnion(obj,
+ (UnionObjectInspector) objInspector);
+ addOneFieldToRecord(record, TEXT, String.format("[%s]",
+ HdfsUtilities.toString(unionRecord, collectionDelim)));
+ break;
+ default:
+ throw new UnsupportedTypeException("Unknown category type: "
+ + objInspector.getCategory());
+ }
+ }
+
+ private List<OneField> traverseUnion(Object obj, UnionObjectInspector uoi)
+ throws BadRecordException, IOException {
+ List<OneField> unionRecord = new LinkedList<>();
+ List<? extends ObjectInspector> ois = uoi.getObjectInspectors();
+ if (ois == null) {
+ throw new BadRecordException(
+ "Illegal value NULL for Hive data type Union");
+ }
+ traverseTuple(uoi.getField(obj), ois.get(uoi.getTag(obj)), unionRecord,
+ true);
+ return unionRecord;
+ }
+
+ private List<OneField> traverseList(Object obj, ListObjectInspector loi)
+ throws BadRecordException, IOException {
+ List<OneField> listRecord = new LinkedList<>();
+ List<?> list = loi.getList(obj);
+ ObjectInspector eoi = loi.getListElementObjectInspector();
+ if (list == null) {
+ throw new BadRecordException(
+ "Illegal value NULL for Hive data type List");
+ }
+ for (Object object : list) {
+ traverseTuple(object, eoi, listRecord, true);
+ }
+ return listRecord;
+ }
+
+ private List<OneField> traverseStruct(Object struct,
+ StructObjectInspector soi,
+ boolean toFlatten)
+ throws BadRecordException, IOException {
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ List<Object> structFields = soi.getStructFieldsDataAsList(struct);
+ if (structFields == null) {
+ throw new BadRecordException(
+ "Illegal value NULL for Hive data type Struct");
+ }
+ List<OneField> structRecord = new LinkedList<>();
+ List<OneField> complexRecord = new LinkedList<>();
+ for (int i = 0; i < structFields.size(); i++) {
+ if (toFlatten) {
+ complexRecord.add(new OneField(TEXT.getOID(), String.format(
+ "\"%s\"", fields.get(i).getFieldName())));
+ }
+ traverseTuple(structFields.get(i),
+ fields.get(i).getFieldObjectInspector(), complexRecord,
+ toFlatten);
+ if (toFlatten) {
+ addOneFieldToRecord(structRecord, TEXT,
+ HdfsUtilities.toString(complexRecord, mapkeyDelim));
+ complexRecord.clear();
+ }
+ }
+ return toFlatten ? structRecord : complexRecord;
+ }
+
+ private List<OneField> traverseMap(Object obj, MapObjectInspector moi)
+ throws BadRecordException, IOException {
+ List<OneField> complexRecord = new LinkedList<>();
+ List<OneField> mapRecord = new LinkedList<>();
+ ObjectInspector koi = moi.getMapKeyObjectInspector();
+ ObjectInspector voi = moi.getMapValueObjectInspector();
+ Map<?, ?> map = moi.getMap(obj);
+ if (map == null) {
+ throw new BadRecordException(
+ "Illegal value NULL for Hive data type Map");
+ } else if (map.isEmpty()) {
+ traverseTuple(null, koi, complexRecord, true);
+ traverseTuple(null, voi, complexRecord, true);
+ addOneFieldToRecord(mapRecord, TEXT,
+ HdfsUtilities.toString(complexRecord, mapkeyDelim));
+ } else {
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ traverseTuple(entry.getKey(), koi, complexRecord, true);
+ traverseTuple(entry.getValue(), voi, complexRecord, true);
+ addOneFieldToRecord(mapRecord, TEXT,
+ HdfsUtilities.toString(complexRecord, mapkeyDelim));
+ complexRecord.clear();
+ }
+ }
+ return mapRecord;
+ }
+
+ private void resolvePrimitive(Object o, PrimitiveObjectInspector oi,
+ List<OneField> record, boolean toFlatten)
+ throws IOException {
+ Object val;
+ switch (oi.getPrimitiveCategory()) {
+ case BOOLEAN: {
+ val = (o != null) ? ((BooleanObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, BOOLEAN, val);
+ break;
+ }
+ case SHORT: {
+ val = (o != null) ? ((ShortObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, SMALLINT, val);
+ break;
+ }
+ case INT: {
+ val = (o != null) ? ((IntObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, INTEGER, val);
+ break;
+ }
+ case LONG: {
+ val = (o != null) ? ((LongObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, BIGINT, val);
+ break;
+ }
+ case FLOAT: {
+ val = (o != null) ? ((FloatObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, REAL, val);
+ break;
+ }
+ case DOUBLE: {
+ val = (o != null) ? ((DoubleObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, FLOAT8, val);
+ break;
+ }
+ case DECIMAL: {
+ String sVal = null;
+ if (o != null) {
+ HiveDecimal hd = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
+ if (hd != null) {
+ BigDecimal bd = hd.bigDecimalValue();
+ sVal = bd.toString();
+ }
+ }
+ addOneFieldToRecord(record, NUMERIC, sVal);
+ break;
+ }
+ case STRING: {
+ val = (o != null) ? ((StringObjectInspector) oi).getPrimitiveJavaObject(o)
+ : null;
+ addOneFieldToRecord(record, TEXT,
+ toFlatten ? String.format("\"%s\"", val) : val);
+ break;
+ }
+ case VARCHAR:
+ val = (o != null) ? ((HiveVarcharObjectInspector) oi).getPrimitiveJavaObject(o)
+ : null;
+ addOneFieldToRecord(record, VARCHAR,
+ toFlatten ? String.format("\"%s\"", val) : val);
+ break;
+ case CHAR:
+ val = (o != null) ? ((HiveCharObjectInspector) oi).getPrimitiveJavaObject(o)
+ : null;
+ addOneFieldToRecord(record, BPCHAR,
+ toFlatten ? String.format("\"%s\"", val) : val);
+ break;
+ case BINARY: {
+ byte[] toEncode = null;
+ if (o != null) {
+ BytesWritable bw = ((BinaryObjectInspector) oi).getPrimitiveWritableObject(o);
+ toEncode = new byte[bw.getLength()];
+ System.arraycopy(bw.getBytes(), 0, toEncode, 0,
+ bw.getLength());
+ }
+ addOneFieldToRecord(record, BYTEA, toEncode);
+ break;
+ }
+ case TIMESTAMP: {
+ val = (o != null) ? ((TimestampObjectInspector) oi).getPrimitiveJavaObject(o)
+ : null;
+ addOneFieldToRecord(record, TIMESTAMP, val);
+ break;
+ }
+ case DATE:
+ val = (o != null) ? ((DateObjectInspector) oi).getPrimitiveJavaObject(o)
+ : null;
+ addOneFieldToRecord(record, DATE, val);
+ break;
+ case BYTE: { /* TINYINT */
+ val = (o != null) ? new Short(((ByteObjectInspector) oi).get(o))
+ : null;
+ addOneFieldToRecord(record, SMALLINT, val);
+ break;
+ }
+ default: {
+ throw new UnsupportedTypeException(oi.getTypeName()
+ + " conversion is not supported by "
+ + getClass().getSimpleName());
+ }
+ }
+ }
+
+ private void addOneFieldToRecord(List<OneField> record,
+ DataType gpdbWritableType, Object val) {
+ record.add(new OneField(gpdbWritableType.getOID(), val));
+ }
+
+ /*
+ * Gets the delimiter character from the URL, verify and store it. Must be a
+ * single ascii character (same restriction as Hawq's). If a hex
+ * representation was passed, convert it to its char.
+ */
+ void parseDelimiterChar(InputData input) {
+
+ String userDelim = input.getUserProperty("DELIMITER");
+
+ final int VALID_LENGTH = 1;
+ final int VALID_LENGTH_HEX = 4;
+
+ if (userDelim.startsWith("\\x")) { // hexadecimal sequence
+
+ if (userDelim.length() != VALID_LENGTH_HEX) {
+ throw new IllegalArgumentException(
+ "Invalid hexdecimal value for delimiter (got"
+ + userDelim + ")");
+ }
+
+ delimiter = (char) Integer.parseInt(
+ userDelim.substring(2, VALID_LENGTH_HEX), 16);
+
+ if (!CharUtils.isAscii(delimiter)) {
+ throw new IllegalArgumentException(
+ "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
+ + delimiter + ")");
+ }
+
+ return;
+ }
+
+ if (userDelim.length() != VALID_LENGTH) {
+ throw new IllegalArgumentException(
+ "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got "
+ + userDelim + ")");
+ }
+
+ if (!CharUtils.isAscii(userDelim.charAt(0))) {
+ throw new IllegalArgumentException(
+ "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
+ + userDelim + ")");
+ }
+
+ delimiter = userDelim.charAt(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
new file mode 100644
index 0000000..90ed1f2
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
@@ -0,0 +1,54 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hawq.pxf.api.io.DataType.VARCHAR;
+
+/**
+ * Specialized HiveResolver for a Hive table stored as Text files.
+ * Use together with HiveInputFormatFragmenter/HiveLineBreakAccessor.
+ */
+public class HiveStringPassResolver extends HiveResolver {
+ private StringBuilder parts;
+
+ public HiveStringPassResolver(InputData input) throws Exception {
+ super(input);
+ }
+
+ @Override
+ void parseUserData(InputData input) throws Exception {
+ String userData = new String(input.getFragmentUserData());
+ String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
+ parseDelimiterChar(input);
+ parts = new StringBuilder();
+ partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+ }
+
+ @Override
+ void initSerde(InputData input) {
+ /* nothing to do here */
+ }
+
+ @Override
+ void initPartitionFields() {
+ initPartitionFields(parts);
+ }
+
+ /**
+ * getFields returns a singleton list of OneField item.
+ * OneField item contains two fields: an integer representing the VARCHAR type and a Java
+ * Object representing the field value.
+ */
+ @Override
+ public List<OneField> getFields(OneRow onerow) throws Exception {
+ String line = (onerow.getData()).toString();
+
+ /* We follow Hive convention. Partition fields are always added at the end of the record */
+ return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
new file mode 100644
index 0000000..55d41a6
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -0,0 +1,219 @@
+package org.apache.hawq.pxf.plugins.hive.utilities;
+
+import java.util.ArrayList;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import org.apache.hawq.pxf.api.Metadata;
+import org.apache.hawq.pxf.api.UnsupportedTypeException;
+
+/**
+ * Class containing helper functions connecting
+ * and interacting with Hive.
+ */
+public class HiveUtilities {
+
+ private static final Log LOG = LogFactory.getLog(HiveUtilities.class);
+
+ /**
+ * Default Hive DB (schema) name.
+ */
+ private static final String HIVE_DEFAULT_DBNAME = "default";
+
+ /**
+ * Initializes the HiveMetaStoreClient
+ * Uses classpath configuration files to locate the MetaStore
+ *
+ * @return initialized client
+ */
+ public static HiveMetaStoreClient initHiveClient() {
+ HiveMetaStoreClient client = null;
+ try {
+ client = new HiveMetaStoreClient(new HiveConf());
+ } catch (MetaException cause) {
+ throw new RuntimeException("Failed connecting to Hive MetaStore service: " + cause.getMessage(), cause);
+ }
+ return client;
+ }
+
+ public static Table getHiveTable(HiveMetaStoreClient client, Metadata.Table tableName)
+ throws Exception {
+ Table tbl = client.getTable(tableName.getDbName(), tableName.getTableName());
+ String tblType = tbl.getTableType();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Table: " + tableName.getDbName() + "." + tableName.getTableName() + ", type: " + tblType);
+ }
+
+ if (TableType.valueOf(tblType) == TableType.VIRTUAL_VIEW) {
+ throw new UnsupportedOperationException("Hive views are not supported by HAWQ");
+ }
+
+ return tbl;
+ }
+
+ /**
+ * Checks if hive type is supported, and if so
+ * return its matching HAWQ type.
+ * Unsupported types will result in an exception.
+ * <br>
+ * The supported mappings are:<ul>
+ * <li>{@code tinyint -> int2}</li>
+ * <li>{@code smallint -> int2}</li>
+ * <li>{@code int -> int4}</li>
+ * <li>{@code bigint -> int8}</li>
+ * <li>{@code boolean -> bool}</li>
+ * <li>{@code float -> float4}</li>
+ * <li>{@code double -> float8}</li>
+ * <li>{@code string -> text}</li>
+ * <li>{@code binary -> bytea}</li>
+ * <li>{@code timestamp -> timestamp}</li>
+ * <li>{@code date -> date}</li>
+ * <li>{@code decimal(precision, scale) -> numeric(precision, scale)}</li>
+ * <li>{@code varchar(size) -> varchar(size)}</li>
+ * <li>{@code char(size) -> bpchar(size)}</li>
+ * </ul>
+ *
+ * @param hiveColumn hive column schema
+ * @return field with mapped HAWQ type and modifiers
+ * @throws UnsupportedTypeException if the column type is not supported
+ */
+ public static Metadata.Field mapHiveType(FieldSchema hiveColumn) throws UnsupportedTypeException {
+ String fieldName = hiveColumn.getName();
+ String hiveType = hiveColumn.getType();
+ String mappedType;
+ String[] modifiers = null;
+
+ // check parameterized types:
+ if (hiveType.startsWith("varchar(") ||
+ hiveType.startsWith("char(")) {
+ String[] toks = hiveType.split("[(,)]");
+ if (toks.length != 2) {
+ throw new UnsupportedTypeException( "HAWQ does not support type " + hiveType + " (Field " + fieldName + "), " +
+ "expected type of the form <type name>(<parameter>)");
+ }
+ mappedType = toks[0];
+ if (mappedType.equals("char")) {
+ mappedType = "bpchar";
+ }
+ modifiers = new String[] {toks[1]};
+ } else if (hiveType.startsWith("decimal(")) {
+ String[] toks = hiveType.split("[(,)]");
+ if (toks.length != 3) {
+ throw new UnsupportedTypeException( "HAWQ does not support type " + hiveType + " (Field " + fieldName + "), " +
+ "expected type of the form <type name>(<parameter>,<parameter>)");
+ }
+ mappedType = "numeric";
+ modifiers = new String[] {toks[1], toks[2]};
+ } else {
+
+ switch (hiveType) {
+ case "tinyint":
+ case "smallint":
+ mappedType = "int2";
+ break;
+ case "int":
+ mappedType = "int4";
+ break;
+ case "bigint":
+ mappedType = "int8";
+ break;
+ case "boolean":
+ mappedType = "bool";
+ break;
+ case "timestamp":
+ case "date":
+ mappedType = hiveType;
+ break;
+ case "float":
+ mappedType = "float4";
+ break;
+ case "double":
+ mappedType = "float8";
+ break;
+ case "string":
+ mappedType = "text";
+ break;
+ case "binary":
+ mappedType = "bytea";
+ break;
+ default:
+ throw new UnsupportedTypeException(
+ "HAWQ does not support type " + hiveType + " (Field " + fieldName + ")");
+ }
+ }
+ if (!verifyModifers(modifiers)) {
+ throw new UnsupportedTypeException("HAWQ does not support type " + hiveType + " (Field " + fieldName + "), modifiers should be integers");
+ }
+ return new Metadata.Field(fieldName, mappedType, modifiers);
+ }
+
+ /**
+ * Verifies modifiers are null or integers.
+ * Modifier is a value assigned to a type,
+ * e.g. size of a varchar - varchar(size).
+ *
+ * @param modifiers type modifiers to be verified
+ * @return whether modifiers are null or integers
+ */
+ private static boolean verifyModifers(String[] modifiers) {
+ if (modifiers == null) {
+ return true;
+ }
+ for (String modifier: modifiers) {
+ if (StringUtils.isBlank(modifier) || !StringUtils.isNumeric(modifier)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Extracts the db_name and table_name from the qualifiedName.
+ * qualifiedName is the Hive table name that the user enters in the CREATE EXTERNAL TABLE statement
+ * or when querying HCatalog table.
+ * It can be either <code>table_name</code> or <code>db_name.table_name</code>.
+ *
+ * @param qualifiedName Hive table name
+ * @return {@link org.apache.hawq.pxf.api.Metadata.Table} object holding the full table name
+ */
+ public static Metadata.Table parseTableQualifiedName(String qualifiedName) {
+
+ String dbName, tableName;
+ String errorMsg = " is not a valid Hive table name. "
+ + "Should be either <table_name> or <db_name.table_name>";
+
+ if (StringUtils.isBlank(qualifiedName)) {
+ throw new IllegalArgumentException("empty string" + errorMsg);
+ }
+
+ String[] rawToks = qualifiedName.split("[.]");
+ ArrayList<String> toks = new ArrayList<String>();
+ for (String tok: rawToks) {
+ if (StringUtils.isBlank(tok)) {
+ continue;
+ }
+ toks.add(tok.trim());
+ }
+
+ if (toks.size() == 1) {
+ dbName = HIVE_DEFAULT_DBNAME;
+ tableName = toks.get(0);
+ } else if (toks.size() == 2) {
+ dbName = toks.get(0);
+ tableName = toks.get(1);
+ } else {
+ throw new IllegalArgumentException("\"" + qualifiedName + "\"" + errorMsg);
+ }
+
+ return new Metadata.Table(dbName, tableName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenterTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenterTest.java b/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenterTest.java
deleted file mode 100755
index 3964e0b..0000000
--- a/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenterTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import com.pivotal.pxf.api.utilities.InputData;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.mapred.JobConf;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({HiveDataFragmenter.class}) // Enables mocking 'new' calls
-@SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf",
- "org.apache.hadoop.hive.metastore.api.MetaException",
- "com.pivotal.pxf.plugins.hive.utilities.HiveUtilities"}) // Prevents static inits
-public class HiveDataFragmenterTest {
- InputData inputData;
- Configuration hadoopConfiguration;
- JobConf jobConf;
- HiveConf hiveConfiguration;
- HiveMetaStoreClient hiveClient;
- HiveDataFragmenter fragmenter;
-
- @Test
- public void construction() throws Exception {
- prepareConstruction();
- fragmenter = new HiveDataFragmenter(inputData);
- PowerMockito.verifyNew(JobConf.class).withArguments(hadoopConfiguration, HiveDataFragmenter.class);
- PowerMockito.verifyNew(HiveMetaStoreClient.class).withArguments(hiveConfiguration);
- }
-
- @Test
- public void constructorCantAccessMetaStore() throws Exception {
- prepareConstruction();
- PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConfiguration).thenThrow(new MetaException("which way to albuquerque"));
-
- try {
- fragmenter = new HiveDataFragmenter(inputData);
- fail("Expected a RuntimeException");
- } catch (RuntimeException ex) {
- assertEquals(ex.getMessage(), "Failed connecting to Hive MetaStore service: which way to albuquerque");
- }
- }
-
- @Test
- public void invalidTableName() throws Exception {
- prepareConstruction();
- fragmenter = new HiveDataFragmenter(inputData);
-
- when(inputData.getDataSource()).thenReturn("t.r.o.u.b.l.e.m.a.k.e.r");
-
- try {
- fragmenter.getFragments();
- fail("Expected an IllegalArgumentException");
- } catch (IllegalArgumentException ex) {
- assertEquals(ex.getMessage(), "\"t.r.o.u.b.l.e.m.a.k.e.r\" is not a valid Hive table name. Should be either <table_name> or <db_name.table_name>");
- }
- }
-
- private void prepareConstruction() throws Exception {
- inputData = mock(InputData.class);
-
- hadoopConfiguration = mock(Configuration.class);
- PowerMockito.whenNew(Configuration.class).withNoArguments().thenReturn(hadoopConfiguration);
-
- jobConf = mock(JobConf.class);
- PowerMockito.whenNew(JobConf.class).withArguments(hadoopConfiguration, HiveDataFragmenter.class).thenReturn(jobConf);
-
- hiveConfiguration = mock(HiveConf.class);
- PowerMockito.whenNew(HiveConf.class).withNoArguments().thenReturn(hiveConfiguration);
-
- hiveClient = mock(HiveMetaStoreClient.class);
- PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConfiguration).thenReturn(hiveClient);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveFilterBuilderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveFilterBuilderTest.java b/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveFilterBuilderTest.java
deleted file mode 100755
index b21d9ba..0000000
--- a/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveFilterBuilderTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import org.junit.Test;
-
-import java.util.List;
-
-import static com.pivotal.pxf.api.FilterParser.BasicFilter;
-import static com.pivotal.pxf.api.FilterParser.Operation;
-import static com.pivotal.pxf.api.FilterParser.Operation.*;
-import static org.junit.Assert.assertEquals;
-
-public class HiveFilterBuilderTest {
- @Test
- public void parseFilterWithThreeOperations() throws Exception {
- HiveFilterBuilder builder = new HiveFilterBuilder(null);
- String[] consts = new String[] {"first", "2", "3"};
- Operation[] ops = new Operation[] {HDOP_EQ, HDOP_GT, HDOP_LT};
- int[] idx = new int[] {1, 2, 3};
-
- @SuppressWarnings("unchecked")
- List<BasicFilter> filterList = (List) builder.getFilterObject("a1c\"first\"o5a2c2o2o7a3c3o1o7");
- assertEquals(consts.length, filterList.size());
- for (int i = 0; i < filterList.size(); i++) {
- BasicFilter filter = filterList.get(i);
- assertEquals(filter.getConstant().constant().toString(), consts[i]);
- assertEquals(filter.getOperation(), ops[i]);
- assertEquals(filter.getColumn().index(), idx[i]);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveMetadataFetcherTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveMetadataFetcherTest.java b/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveMetadataFetcherTest.java
deleted file mode 100644
index 052f723..0000000
--- a/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/HiveMetadataFetcherTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import com.pivotal.pxf.api.Metadata;
-import com.pivotal.pxf.plugins.hive.utilities.HiveUtilities;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({HiveMetadataFetcher.class}) // Enables mocking 'new' calls
-@SuppressStaticInitializationFor({"org.apache.hadoop.hive.metastore.api.MetaException",
-"com.pivotal.pxf.plugins.hive.utilities.HiveUtilities"}) // Prevents static inits
-public class HiveMetadataFetcherTest {
-
- Log LOG;
- HiveConf hiveConfiguration;
- HiveMetaStoreClient hiveClient;
- HiveMetadataFetcher fetcher;
- Metadata metadata;
-
- @Before
- public void SetupCompressionFactory() {
- LOG = mock(Log.class);
- Whitebox.setInternalState(HiveUtilities.class, LOG);
- }
-
- @Test
- public void construction() throws Exception {
- prepareConstruction();
- fetcher = new HiveMetadataFetcher();
- PowerMockito.verifyNew(HiveMetaStoreClient.class).withArguments(hiveConfiguration);
- }
-
- @Test
- public void constructorCantAccessMetaStore() throws Exception {
- prepareConstruction();
- PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConfiguration).thenThrow(new MetaException("which way to albuquerque"));
-
- try {
- fetcher = new HiveMetadataFetcher();
- fail("Expected a RuntimeException");
- } catch (RuntimeException ex) {
- assertEquals("Failed connecting to Hive MetaStore service: which way to albuquerque", ex.getMessage());
- }
- }
-
- @Test
- public void getTableMetadataInvalidTableName() throws Exception {
- prepareConstruction();
- fetcher = new HiveMetadataFetcher();
- String tableName = "t.r.o.u.b.l.e.m.a.k.e.r";
-
- try {
- fetcher.getTableMetadata(tableName);
- fail("Expected an IllegalArgumentException");
- } catch (IllegalArgumentException ex) {
- assertEquals("\"t.r.o.u.b.l.e.m.a.k.e.r\" is not a valid Hive table name. Should be either <table_name> or <db_name.table_name>", ex.getMessage());
- }
- }
-
- @Test
- public void getTableMetadataView() throws Exception {
- prepareConstruction();
-
- fetcher = new HiveMetadataFetcher();
- String tableName = "cause";
-
- // mock hive table returned from hive client
- Table hiveTable = new Table();
- hiveTable.setTableType("VIRTUAL_VIEW");
- when(hiveClient.getTable("default", tableName)).thenReturn(hiveTable);
-
- try {
- metadata = fetcher.getTableMetadata(tableName);
- fail("Expected an UnsupportedOperationException because PXF doesn't support views");
- } catch (UnsupportedOperationException e) {
- assertEquals("Hive views are not supported by HAWQ", e.getMessage());
- }
- }
-
- @Test
- public void getTableMetadata() throws Exception {
- prepareConstruction();
-
- fetcher = new HiveMetadataFetcher();
- String tableName = "cause";
-
- // mock hive table returned from hive client
- List<FieldSchema> fields = new ArrayList<FieldSchema>();
- fields.add(new FieldSchema("field1", "string", null));
- fields.add(new FieldSchema("field2", "int", null));
- StorageDescriptor sd = new StorageDescriptor();
- sd.setCols(fields);
- Table hiveTable = new Table();
- hiveTable.setTableType("MANAGED_TABLE");
- hiveTable.setSd(sd);
- hiveTable.setPartitionKeys(new ArrayList<FieldSchema>());
- when(hiveClient.getTable("default", tableName)).thenReturn(hiveTable);
-
- // get metadata
- metadata = fetcher.getTableMetadata(tableName);
-
- assertEquals("default.cause", metadata.getTable().toString());
-
- List<Metadata.Field> resultFields = metadata.getFields();
- assertNotNull(resultFields);
- assertEquals(2, resultFields.size());
- Metadata.Field field = resultFields.get(0);
- assertEquals("field1", field.getName());
- assertEquals("text", field.getType()); // converted type
- field = resultFields.get(1);
- assertEquals("field2", field.getName());
- assertEquals("int4", field.getType());
- }
-
- private void prepareConstruction() throws Exception {
- hiveConfiguration = mock(HiveConf.class);
- PowerMockito.whenNew(HiveConf.class).withNoArguments().thenReturn(hiveConfiguration);
-
- hiveClient = mock(HiveMetaStoreClient.class);
- PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConfiguration).thenReturn(hiveClient);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/utilities/HiveUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/utilities/HiveUtilitiesTest.java b/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/utilities/HiveUtilitiesTest.java
deleted file mode 100644
index e193581..0000000
--- a/pxf/pxf-hive/src/test/java/com/pivotal/pxf/plugins/hive/utilities/HiveUtilitiesTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-package com.pivotal.pxf.plugins.hive.utilities;
-
-import static org.junit.Assert.*;
-
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.junit.Test;
-
-import com.pivotal.pxf.api.Metadata;
-import com.pivotal.pxf.api.UnsupportedTypeException;
-
-public class HiveUtilitiesTest {
-
- FieldSchema hiveColumn;
- Metadata.Table tblDesc;
-
- static String[][] typesMappings = {
- /* hive type -> hawq type */
- {"tinyint", "int2"},
- {"smallint", "int2"},
- {"int", "int4"},
- {"bigint", "int8"},
- {"boolean", "bool"},
- {"float", "float4"},
- {"double", "float8"},
- {"string", "text"},
- {"binary", "bytea"},
- {"timestamp", "timestamp"},
- {"date", "date"},
- };
-
- static String[][] typesWithModifiers = {
- {"decimal(19,84)", "numeric", "19,84"},
- {"varchar(13)", "varchar", "13"},
- {"char(40)", "bpchar", "40"},
- };
-
- @Test
- public void mapHiveTypeUnsupported() throws Exception {
-
- hiveColumn = new FieldSchema("complex", "array", null);
-
- try {
- HiveUtilities.mapHiveType(hiveColumn);
- fail("unsupported type");
- } catch (UnsupportedTypeException e) {
- assertEquals("HAWQ does not support type " + hiveColumn.getType() + " (Field " + hiveColumn.getName() + ")",
- e.getMessage());
- }
- }
-
- @Test
- public void mapHiveTypeSimple() throws Exception {
- /*
- * tinyint -> int2
- * smallint -> int2
- * int -> int4
- * bigint -> int8
- * boolean -> bool
- * float -> float4
- * double -> float8
- * string -> text
- * binary -> bytea
- * timestamp -> timestamp
- * date -> date
- */
- for (String[] line: typesMappings) {
- String hiveType = line[0];
- String expectedType = line[1];
- hiveColumn = new FieldSchema("field" + hiveType, hiveType, null);
- Metadata.Field result = HiveUtilities.mapHiveType(hiveColumn);
- assertEquals("field" + hiveType, result.getName());
- assertEquals(expectedType, result.getType());
- assertNull(result.getModifiers());
- }
- }
-
- @Test
- public void mapHiveTypeWithModifiers() throws Exception {
- /*
- * decimal -> numeric
- * varchar -> varchar
- * char -> bpchar
- */
- for (String[] line: typesWithModifiers) {
- String hiveType = line[0];
- String expectedType = line[1];
- String modifiersStr = line[2];
- String[] expectedModifiers = modifiersStr.split(",");
- hiveColumn = new FieldSchema("field" + hiveType, hiveType, null);
- Metadata.Field result = HiveUtilities.mapHiveType(hiveColumn);
- assertEquals("field" + hiveType, result.getName());
- assertEquals(expectedType, result.getType());
- assertArrayEquals(expectedModifiers, result.getModifiers());
- }
- }
-
- @Test
- public void mapHiveTypeWithModifiersNegative() throws Exception {
-
- String badHiveType = "decimal(2)";
- hiveColumn = new FieldSchema("badNumeric", badHiveType, null);
- try {
- HiveUtilities.mapHiveType(hiveColumn);
- fail("should fail with bad numeric type error");
- } catch (UnsupportedTypeException e) {
- String errorMsg = "HAWQ does not support type " + badHiveType + " (Field badNumeric), " +
- "expected type of the form <type name>(<parameter>,<parameter>)";
- assertEquals(errorMsg, e.getMessage());
- }
-
- badHiveType = "char(1,2,3)";
- hiveColumn = new FieldSchema("badChar", badHiveType, null);
- try {
- HiveUtilities.mapHiveType(hiveColumn);
- fail("should fail with bad char type error");
- } catch (UnsupportedTypeException e) {
- String errorMsg = "HAWQ does not support type " + badHiveType + " (Field badChar), " +
- "expected type of the form <type name>(<parameter>)";
- assertEquals(errorMsg, e.getMessage());
- }
-
- badHiveType = "char(acter)";
- hiveColumn = new FieldSchema("badModifier", badHiveType, null);
- try {
- HiveUtilities.mapHiveType(hiveColumn);
- fail("should fail with bad modifier error");
- } catch (UnsupportedTypeException e) {
- String errorMsg = "HAWQ does not support type " + badHiveType + " (Field badModifier), " +
- "modifiers should be integers";
- assertEquals(errorMsg, e.getMessage());
- }
- }
-
- @Test
- public void parseTableQualifiedNameNoDbName() throws Exception {
- String name = "orphan";
- tblDesc = HiveUtilities.parseTableQualifiedName(name);
-
- assertEquals("default", tblDesc.getDbName());
- assertEquals(name, tblDesc.getTableName());
- }
-
- @Test
- public void parseTableQualifiedName() throws Exception {
- String name = "not.orphan";
- tblDesc = HiveUtilities.parseTableQualifiedName(name);
-
- assertEquals("not", tblDesc.getDbName());
- assertEquals("orphan", tblDesc.getTableName());
- }
-
- @Test
- public void parseTableQualifiedNameTooManyQualifiers() throws Exception {
- String name = "too.many.parents";
- String errorMsg = surroundByQuotes(name) + " is not a valid Hive table name. "
- + "Should be either <table_name> or <db_name.table_name>";
-
- parseTableQualifiedNameNegative(name, errorMsg, "too many qualifiers");
- }
-
- @Test
- public void parseTableQualifiedNameEmpty() throws Exception {
- String name = "";
- String errorMsg = "empty string is not a valid Hive table name. "
- + "Should be either <table_name> or <db_name.table_name>";
-
- parseTableQualifiedNameNegative(name, errorMsg, "empty string");
-
- name = null;
- parseTableQualifiedNameNegative(name, errorMsg, "null string");
-
- name = ".";
- errorMsg = surroundByQuotes(name) + " is not a valid Hive table name. "
- + "Should be either <table_name> or <db_name.table_name>";
- parseTableQualifiedNameNegative(name, errorMsg, "empty db and table names");
-
- name = " . ";
- errorMsg = surroundByQuotes(name) + " is not a valid Hive table name. "
- + "Should be either <table_name> or <db_name.table_name>";
- parseTableQualifiedNameNegative(name, errorMsg, "only white spaces in string");
- }
-
- private String surroundByQuotes(String str) {
- return "\"" + str + "\"";
- }
-
- private void parseTableQualifiedNameNegative(String name, String errorMsg, String reason) throws Exception {
- try {
- tblDesc = HiveUtilities.parseTableQualifiedName(name);
- fail("test should fail because of " + reason);
- } catch (IllegalArgumentException e) {
- assertEquals(errorMsg, e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenterTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenterTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenterTest.java
new file mode 100755
index 0000000..b375d65
--- /dev/null
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenterTest.java
@@ -0,0 +1,86 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({HiveDataFragmenter.class}) // Enables mocking 'new' calls
+@SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf",
+ "org.apache.hadoop.hive.metastore.api.MetaException",
+ "org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities"}) // Prevents static inits
+public class HiveDataFragmenterTest {
+ InputData inputData;
+ Configuration hadoopConfiguration;
+ JobConf jobConf;
+ HiveConf hiveConfiguration;
+ HiveMetaStoreClient hiveClient;
+ HiveDataFragmenter fragmenter;
+
+ @Test
+ public void construction() throws Exception {
+ prepareConstruction();
+ fragmenter = new HiveDataFragmenter(inputData);
+ PowerMockito.verifyNew(JobConf.class).withArguments(hadoopConfiguration, HiveDataFragmenter.class);
+ PowerMockito.verifyNew(HiveMetaStoreClient.class).withArguments(hiveConfiguration);
+ }
+
+ @Test
+ public void constructorCantAccessMetaStore() throws Exception {
+ prepareConstruction();
+ PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConfiguration).thenThrow(new MetaException("which way to albuquerque"));
+
+ try {
+ fragmenter = new HiveDataFragmenter(inputData);
+ fail("Expected a RuntimeException");
+ } catch (RuntimeException ex) {
+ assertEquals(ex.getMessage(), "Failed connecting to Hive MetaStore service: which way to albuquerque");
+ }
+ }
+
+ @Test
+ public void invalidTableName() throws Exception {
+ prepareConstruction();
+ fragmenter = new HiveDataFragmenter(inputData);
+
+ when(inputData.getDataSource()).thenReturn("t.r.o.u.b.l.e.m.a.k.e.r");
+
+ try {
+ fragmenter.getFragments();
+ fail("Expected an IllegalArgumentException");
+ } catch (IllegalArgumentException ex) {
+ assertEquals(ex.getMessage(), "\"t.r.o.u.b.l.e.m.a.k.e.r\" is not a valid Hive table name. Should be either <table_name> or <db_name.table_name>");
+ }
+ }
+
+ private void prepareConstruction() throws Exception {
+ inputData = mock(InputData.class);
+
+ hadoopConfiguration = mock(Configuration.class);
+ PowerMockito.whenNew(Configuration.class).withNoArguments().thenReturn(hadoopConfiguration);
+
+ jobConf = mock(JobConf.class);
+ PowerMockito.whenNew(JobConf.class).withArguments(hadoopConfiguration, HiveDataFragmenter.class).thenReturn(jobConf);
+
+ hiveConfiguration = mock(HiveConf.class);
+ PowerMockito.whenNew(HiveConf.class).withNoArguments().thenReturn(hiveConfiguration);
+
+ hiveClient = mock(HiveMetaStoreClient.class);
+ PowerMockito.whenNew(HiveMetaStoreClient.class).withArguments(hiveConfiguration).thenReturn(hiveClient);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveFilterBuilderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveFilterBuilderTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveFilterBuilderTest.java
new file mode 100755
index 0000000..4561cf0
--- /dev/null
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveFilterBuilderTest.java
@@ -0,0 +1,30 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.hawq.pxf.api.FilterParser.BasicFilter;
+import static org.apache.hawq.pxf.api.FilterParser.Operation;
+import static org.apache.hawq.pxf.api.FilterParser.Operation.*;
+import static org.junit.Assert.assertEquals;
+
+public class HiveFilterBuilderTest {
+ @Test
+ public void parseFilterWithThreeOperations() throws Exception {
+ HiveFilterBuilder builder = new HiveFilterBuilder(null);
+ String[] consts = new String[] {"first", "2", "3"};
+ Operation[] ops = new Operation[] {HDOP_EQ, HDOP_GT, HDOP_LT};
+ int[] idx = new int[] {1, 2, 3};
+
+ @SuppressWarnings("unchecked")
+ List<BasicFilter> filterList = (List) builder.getFilterObject("a1c\"first\"o5a2c2o2o7a3c3o1o7");
+ assertEquals(consts.length, filterList.size());
+ for (int i = 0; i < filterList.size(); i++) {
+ BasicFilter filter = filterList.get(i);
+ assertEquals(filter.getConstant().constant().toString(), consts[i]);
+ assertEquals(filter.getOperation(), ops[i]);
+ assertEquals(filter.getColumn().index(), idx[i]);
+ }
+ }
+}
\ No newline at end of file