You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2015/10/28 23:09:58 UTC
[08/15] incubator-hawq git commit: HAWQ-45. PXF Package Namespace
change
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java
deleted file mode 100644
index 462c7a2..0000000
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java
+++ /dev/null
@@ -1,244 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import com.pivotal.pxf.api.FilterParser;
-import com.pivotal.pxf.api.utilities.ColumnDescriptor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hdfs.HdfsSplittableDataAccessor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Accessor for Hive tables. The accessor will open and read a split belonging
- * to a Hive table. Opening a split means creating the corresponding InputFormat
- * and RecordReader required to access the split's data. The actual record
- * reading is done in the base class -
- * {@link com.pivotal.pxf.plugins.hdfs.HdfsSplittableDataAccessor}. <br>
- * HiveAccessor will also enforce Hive partition filtering by filtering-out a
- * split which does not belong to a partition filter. Naturally, the partition
- * filtering will be done only for Hive tables that are partitioned.
- */
-public class HiveAccessor extends HdfsSplittableDataAccessor {
- private static final Log LOG = LogFactory.getLog(HiveAccessor.class);
- List<HivePartition> partitions;
-
- class HivePartition {
- public String name;
- public String type;
- public String val;
-
- HivePartition(String name, String type, String val) {
- this.name = name;
- this.type = type;
- this.val = val;
- }
- }
-
- protected Boolean filterInFragmenter = false;
-
- /**
- * Constructs a HiveAccessor and creates an InputFormat (derived from
- * {@link org.apache.hadoop.mapred.InputFormat}) and the Hive partition
- * fields
- *
- * @param input contains the InputFormat class name and the partition fields
- * @throws Exception if failed to create input format
- */
- public HiveAccessor(InputData input) throws Exception {
- /*
- * Unfortunately, Java does not allow us to call a function before
- * calling the base constructor, otherwise it would have been:
- * super(input, createInputFormat(input))
- */
- super(input, null);
- inputFormat = createInputFormat(input);
- }
-
- /**
- * Constructs a HiveAccessor
- *
- * @param input contains the InputFormat class name and the partition fields
- * @param inputFormat Hive InputFormat
- */
- public HiveAccessor(InputData input, InputFormat<?, ?> inputFormat) {
- super(input, inputFormat);
- }
-
- /**
- * Opens Hive data split for read. Enables Hive partition filtering. <br>
- *
- * @return true if there are no partitions or there is no partition filter
- * or partition filter is set and the file currently opened by the
- * accessor belongs to the partition.
- * @throws Exception if filter could not be built, connection to Hive failed
- * or resource failed to open
- */
- @Override
- public boolean openForRead() throws Exception {
- return isOurDataInsideFilteredPartition() && super.openForRead();
- }
-
- /**
- * Creates the RecordReader suitable for this given split.
- *
- * @param jobConf configuration data for the Hadoop framework
- * @param split the split that was allocated for reading to this accessor
- * @return record reader
- * @throws IOException if failed to create record reader
- */
- @Override
- protected Object getReader(JobConf jobConf, InputSplit split)
- throws IOException {
- return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
- }
-
- /*
- * Parses the user-data supplied by the HiveFragmenter from InputData. Based
- * on the user-data construct the partition fields and the InputFormat for
- * current split
- */
- private InputFormat<?, ?> createInputFormat(InputData input)
- throws Exception {
- String userData = new String(input.getFragmentUserData());
- String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
- initPartitionFields(toks[3]);
- filterInFragmenter = new Boolean(toks[4]);
- return HiveDataFragmenter.makeInputFormat(
- toks[0]/* inputFormat name */, jobConf);
- }
-
- /*
- * The partition fields are initialized one time base on userData provided
- * by the fragmenter
- */
- void initPartitionFields(String partitionKeys) {
- partitions = new LinkedList<HivePartition>();
- if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
- return;
- }
-
- String[] partitionLevels = partitionKeys.split(HiveDataFragmenter.HIVE_PARTITIONS_DELIM);
- for (String partLevel : partitionLevels) {
- String[] levelKey = partLevel.split(HiveDataFragmenter.HIVE_1_PART_DELIM);
- String name = levelKey[0];
- String type = levelKey[1];
- String val = levelKey[2];
- partitions.add(new HivePartition(name, type, val));
- }
- }
-
- private boolean isOurDataInsideFilteredPartition() throws Exception {
- if (!inputData.hasFilter()) {
- return true;
- }
-
- if (filterInFragmenter) {
- LOG.debug("filtering was done in fragmenter");
- return true;
- }
-
- String filterStr = inputData.getFilterString();
- HiveFilterBuilder eval = new HiveFilterBuilder(inputData);
- Object filter = eval.getFilterObject(filterStr);
-
- boolean returnData = isFiltered(partitions, filter);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("segmentId: " + inputData.getSegmentId() + " "
- + inputData.getDataSource() + "--" + filterStr
- + " returnData: " + returnData);
- if (filter instanceof List) {
- for (Object f : (List<?>) filter) {
- printOneBasicFilter(f);
- }
- } else {
- printOneBasicFilter(filter);
- }
- }
-
- return returnData;
- }
-
- private boolean isFiltered(List<HivePartition> partitionFields,
- Object filter) {
- if (filter instanceof List) {
- /*
- * We are going over each filter in the filters list and test it
- * against all the partition fields since filters are connected only
- * by AND operators, its enough for one filter to fail in order to
- * deny this data.
- */
- for (Object f : (List<?>) filter) {
- if (!testOneFilter(partitionFields, f, inputData)) {
- return false;
- }
- }
- return true;
- }
-
- return testOneFilter(partitionFields, filter, inputData);
- }
-
- /*
- * We are testing one filter against all the partition fields. The filter
- * has the form "fieldA = valueA". The partitions have the form
- * partitionOne=valueOne/partitionTwo=ValueTwo/partitionThree=valueThree 1.
- * For a filter to match one of the partitions, lets say partitionA for
- * example, we need: fieldA = partittionOne and valueA = valueOne. If this
- * condition occurs, we return true. 2. If fieldA does not match any one of
- * the partition fields we also return true, it means we ignore this filter
- * because it is not on a partition field. 3. If fieldA = partittionOne and
- * valueA != valueOne, then we return false.
- */
- private boolean testOneFilter(List<HivePartition> partitionFields,
- Object filter, InputData input) {
- // Let's look first at the filter
- FilterParser.BasicFilter bFilter = (FilterParser.BasicFilter) filter;
-
- boolean isFilterOperationEqual = (bFilter.getOperation() == FilterParser.Operation.HDOP_EQ);
- if (!isFilterOperationEqual) /*
- * in case this is not an "equality filter"
- * we ignore it here - in partition
- * filtering
- */{
- return true;
- }
-
- int filterColumnIndex = bFilter.getColumn().index();
- String filterValue = bFilter.getConstant().constant().toString();
- ColumnDescriptor filterColumn = input.getColumn(filterColumnIndex);
- String filterColumnName = filterColumn.columnName();
-
- for (HivePartition partition : partitionFields) {
- if (filterColumnName.equals(partition.name)) {
- /*
- * the filter field matches a partition field, but the values do
- * not match
- */
- return filterValue.equals(partition.val);
- }
- }
-
- /*
- * filter field did not match any partition field, so we ignore this
- * filter and hence return true
- */
- return true;
- }
-
- private void printOneBasicFilter(Object filter) {
- FilterParser.BasicFilter bFilter = (FilterParser.BasicFilter) filter;
- boolean isOperationEqual = (bFilter.getOperation() == FilterParser.Operation.HDOP_EQ);
- int columnIndex = bFilter.getColumn().index();
- String value = bFilter.getConstant().constant().toString();
- LOG.debug("isOperationEqual: " + isOperationEqual + " columnIndex: "
- + columnIndex + " value: " + value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveColumnarSerdeResolver.java
deleted file mode 100644
index 0517247..0000000
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ /dev/null
@@ -1,213 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import com.pivotal.pxf.api.BadRecordException;
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.UnsupportedTypeException;
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.utilities.ColumnDescriptor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.service.utilities.Utilities;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
-import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDeBase;
-import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import static com.pivotal.pxf.api.io.DataType.VARCHAR;
-
-/**
- * Specialized HiveResolver for a Hive table stored as RC file.
- * Use together with HiveInputFormatFragmenter/HiveRCFileAccessor.
- */
-public class HiveColumnarSerdeResolver extends HiveResolver {
- private static final Log LOG = LogFactory.getLog(HiveColumnarSerdeResolver.class);
- private ColumnarSerDeBase deserializer;
- private boolean firstColumn;
- private StringBuilder builder;
- private StringBuilder parts;
- private int numberOfPartitions;
- private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
-
- public HiveColumnarSerdeResolver(InputData input) throws Exception {
- super(input);
- }
-
- /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
- @Override
- void parseUserData(InputData input) throws Exception {
- String[] toks = HiveInputFormatFragmenter.parseToks(input);
- String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
- if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE.name())) {
- serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE;
- } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name())) {
- serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE;
- } else {
- throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
- }
- parts = new StringBuilder();
- partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
- parseDelimiterChar(input);
- }
-
- @Override
- void initPartitionFields() {
- numberOfPartitions = initPartitionFields(parts);
- }
-
- /**
- * getFields returns a singleton list of OneField item.
- * OneField item contains two fields: an integer representing the VARCHAR type and a Java
- * Object representing the field value.
- */
- @Override
- public List<OneField> getFields(OneRow onerow) throws Exception {
- firstColumn = true;
- builder = new StringBuilder();
- Object tuple = deserializer.deserialize((Writable) onerow.getData());
- ObjectInspector oi = deserializer.getObjectInspector();
-
- traverseTuple(tuple, oi);
- /* We follow Hive convention. Partition fields are always added at the end of the record */
- builder.append(parts);
- return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString()));
- }
-
- /*
- * Get and init the deserializer for the records of this Hive data fragment.
- * Suppress Warnings added because deserializer.initialize is an abstract function that is deprecated
- * but its implementations (ColumnarSerDe, LazyBinaryColumnarSerDe) still use the deprecated interface.
- */
- @SuppressWarnings("deprecation")
- @Override
- void initSerde(InputData input) throws Exception {
- Properties serdeProperties = new Properties();
- int numberOfDataColumns = input.getColumns() - numberOfPartitions;
-
- LOG.debug("Serde number of columns is " + numberOfDataColumns);
-
- StringBuilder columnNames = new StringBuilder(numberOfDataColumns * 2); // column + delimiter
- StringBuilder columnTypes = new StringBuilder(numberOfDataColumns * 2); // column + delimiter
- String delim = "";
- for (int i = 0; i < numberOfDataColumns; i++) {
- ColumnDescriptor column = input.getColumn(i);
- String columnName = column.columnName();
- String columnType = HiveInputFormatFragmenter.toHiveType(DataType.get(column.columnTypeCode()), columnName);
- columnNames.append(delim).append(columnName);
- columnTypes.append(delim).append(columnType);
- delim = ",";
- }
- serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
- serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
-
- if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE) {
- deserializer = new ColumnarSerDe();
- } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) {
- deserializer = new LazyBinaryColumnarSerDe();
- } else {
- throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
- }
-
- deserializer.initialize(new JobConf(new Configuration(), HiveColumnarSerdeResolver.class), serdeProperties);
- }
-
- /**
- * Handle a Hive record.
- * Supported object categories:
- * Primitive - including NULL
- * Struct (used by ColumnarSerDe to store primitives) - cannot be NULL
- * <p/>
- * Any other category will throw UnsupportedTypeException
- */
- private void traverseTuple(Object obj, ObjectInspector objInspector) throws IOException, BadRecordException {
- ObjectInspector.Category category = objInspector.getCategory();
- if ((obj == null) && (category != ObjectInspector.Category.PRIMITIVE)) {
- throw new BadRecordException("NULL Hive composite object");
- }
- switch (category) {
- case PRIMITIVE:
- resolvePrimitive(obj, (PrimitiveObjectInspector) objInspector);
- break;
- case STRUCT:
- StructObjectInspector soi = (StructObjectInspector) objInspector;
- List<? extends StructField> fields = soi.getAllStructFieldRefs();
- List<?> list = soi.getStructFieldsDataAsList(obj);
- if (list == null) {
- throw new BadRecordException("Illegal value NULL for Hive data type Struct");
- }
- for (int i = 0; i < list.size(); i++) {
- traverseTuple(list.get(i), fields.get(i).getFieldObjectInspector());
- }
- break;
- default:
- throw new UnsupportedTypeException("Hive object category: " + objInspector.getCategory() + " unsupported");
- }
- }
-
- private void resolvePrimitive(Object o, PrimitiveObjectInspector oi) throws IOException {
-
- if (!firstColumn) {
- builder.append(delimiter);
- }
-
- if (o == null) {
- builder.append(nullChar);
- } else {
- switch (oi.getPrimitiveCategory()) {
- case BOOLEAN:
- builder.append(((BooleanObjectInspector) oi).get(o));
- break;
- case SHORT:
- builder.append(((ShortObjectInspector) oi).get(o));
- break;
- case INT:
- builder.append(((IntObjectInspector) oi).get(o));
- break;
- case LONG:
- builder.append(((LongObjectInspector) oi).get(o));
- break;
- case FLOAT:
- builder.append(((FloatObjectInspector) oi).get(o));
- break;
- case DOUBLE:
- builder.append(((DoubleObjectInspector) oi).get(o));
- break;
- case DECIMAL:
- builder.append(((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o).bigDecimalValue());
- break;
- case STRING:
- builder.append(((StringObjectInspector) oi).getPrimitiveJavaObject(o));
- break;
- case BINARY:
- byte[] bytes = ((BinaryObjectInspector) oi).getPrimitiveJavaObject(o);
- Utilities.byteArrayToOctalString(bytes, builder);
- break;
- case TIMESTAMP:
- builder.append(((TimestampObjectInspector) oi).getPrimitiveJavaObject(o));
- break;
- case BYTE: /* TINYINT */
- builder.append(new Short(((ByteObjectInspector) oi).get(o)));
- break;
- default:
- throw new UnsupportedTypeException(oi.getTypeName()
- + " conversion is not supported by HiveColumnarSerdeResolver");
- }
- }
- firstColumn = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java
deleted file mode 100644
index 6ebc62e..0000000
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java
+++ /dev/null
@@ -1,446 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import java.io.ByteArrayOutputStream;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.pivotal.pxf.api.FilterParser;
-import com.pivotal.pxf.api.Fragment;
-import com.pivotal.pxf.api.Fragmenter;
-import com.pivotal.pxf.api.Metadata;
-import com.pivotal.pxf.api.utilities.ColumnDescriptor;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import com.pivotal.pxf.plugins.hive.utilities.HiveUtilities;
-
-/**
- * Fragmenter class for HIVE tables.
- * <br>
- * Given a Hive table and its partitions divide the data into fragments (here a
- * data fragment is actually a HDFS file block) and return a list of them. Each
- * data fragment will contain the following information:
- * <ol>
- * <li>sourceName: full HDFS path to the data file that this data fragment is
- * part of</li>
- * <li>hosts: a list of the datanode machines that hold a replica of this block</li>
- * <li>userData:
- * file_input_format_name_DELIM_serde_name_DELIM_serialization_properties</li>
- * </ol>
- */
-public class HiveDataFragmenter extends Fragmenter {
- private static final Log LOG = LogFactory.getLog(HiveDataFragmenter.class);
- private static final short ALL_PARTS = -1;
-
- static final String HIVE_UD_DELIM = "!HUDD!";
- static final String HIVE_1_PART_DELIM = "!H1PD!";
- static final String HIVE_PARTITIONS_DELIM = "!HPAD!";
- static final String HIVE_NO_PART_TBL = "!HNPT!";
-
- static final String HIVE_API_EQ = " = ";
- static final String HIVE_API_DQUOTE = "\"";
-
- private JobConf jobConf;
- private HiveMetaStoreClient client;
-
- protected boolean filterInFragmenter = false;
-
- // Data structure to hold hive partition names if exist, to be used by
- // partition filtering
- private Set<String> setPartitions = new TreeSet<String>(
- String.CASE_INSENSITIVE_ORDER);
-
- /**
- * A Hive table unit - means a subset of the HIVE table, where we can say
- * that for all files in this subset, they all have the same InputFormat and
- * Serde. For a partitioned table the HiveTableUnit will be one partition
- * and for an unpartitioned table, the HiveTableUnit will be the whole table
- */
- class HiveTablePartition {
- StorageDescriptor storageDesc;
- Properties properties;
- Partition partition;
- List<FieldSchema> partitionKeys;
- String tableName;
-
- HiveTablePartition(StorageDescriptor storageDesc,
- Properties properties, Partition partition,
- List<FieldSchema> partitionKeys, String tableName) {
- this.storageDesc = storageDesc;
- this.properties = properties;
- this.partition = partition;
- this.partitionKeys = partitionKeys;
- this.tableName = tableName;
- }
-
- @Override
- public String toString() {
- return "table - " + tableName
- + ((partition == null) ? "" : ", partition - " + partition);
- }
- }
-
- /**
- * Constructs a HiveDataFragmenter object.
- *
- * @param inputData all input parameters coming from the client
- */
- public HiveDataFragmenter(InputData inputData) {
- this(inputData, HiveDataFragmenter.class);
- }
-
- /**
- * Constructs a HiveDataFragmenter object.
- *
- * @param inputData all input parameters coming from the client
- * @param clazz Class for JobConf
- */
- public HiveDataFragmenter(InputData inputData, Class<?> clazz) {
- super(inputData);
- jobConf = new JobConf(new Configuration(), clazz);
- client = HiveUtilities.initHiveClient();
- }
-
- @Override
- public List<Fragment> getFragments() throws Exception {
- Metadata.Table tblDesc = HiveUtilities.parseTableQualifiedName(inputData.getDataSource());
-
- fetchTableMetaData(tblDesc);
-
- return fragments;
- }
-
- /**
- * Creates the partition InputFormat.
- *
- * @param inputFormatName input format class name
- * @param jobConf configuration data for the Hadoop framework
- * @return a {@link org.apache.hadoop.mapred.InputFormat} derived object
- * @throws Exception if failed to create input format
- */
- public static InputFormat<?, ?> makeInputFormat(String inputFormatName,
- JobConf jobConf)
- throws Exception {
- Class<?> c = Class.forName(inputFormatName, true,
- JavaUtils.getClassLoader());
- InputFormat<?, ?> inputFormat = (InputFormat<?, ?>) c.newInstance();
-
- if ("org.apache.hadoop.mapred.TextInputFormat".equals(inputFormatName)) {
- // TextInputFormat needs a special configuration
- ((TextInputFormat) inputFormat).configure(jobConf);
- }
-
- return inputFormat;
- }
-
- /*
- * Goes over the table partitions metadata and extracts the splits and the
- * InputFormat and Serde per split.
- */
- private void fetchTableMetaData(Metadata.Table tblDesc) throws Exception {
-
- Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
-
- verifySchema(tbl);
-
- List<Partition> partitions = null;
- String filterStringForHive = "";
-
- // If query has filter and hive table has partitions, prepare the filter
- // string for hive metastore and retrieve only the matched partitions
- if (inputData.hasFilter() && tbl.getPartitionKeysSize() > 0) {
-
- // Save all hive partition names in a set for later filter match
- for (FieldSchema fs : tbl.getPartitionKeys()) {
- setPartitions.add(fs.getName());
- }
-
- LOG.debug("setPartitions :" + setPartitions);
-
- // Generate filter string for retrieve match pxf filter/hive
- // partition name
- filterStringForHive = buildFilterStringForHive();
- }
-
- if (!filterStringForHive.isEmpty()) {
-
- LOG.debug("Filter String for Hive partition retrieval : "
- + filterStringForHive);
-
- filterInFragmenter = true;
-
- // API call to Hive Metastore, will return a List of all the
- // partitions for this table, that matches the partition filters
- // Defined in filterStringForHive.
- partitions = client.listPartitionsByFilter(tblDesc.getDbName(),
- tblDesc.getTableName(), filterStringForHive, ALL_PARTS);
-
- // No matched partitions for the filter, no fragments to return.
- if (partitions == null || partitions.isEmpty()) {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table - " + tblDesc.getDbName() + "."
- + tblDesc.getTableName()
- + " Has no matched partitions for the filter : "
- + filterStringForHive);
- }
- return;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Table - " + tblDesc.getDbName() + "."
- + tblDesc.getTableName()
- + " Matched partitions list size: " + partitions.size());
- }
-
- } else {
- // API call to Hive Metastore, will return a List of all the
- // partitions for this table (no filtering)
- partitions = client.listPartitions(tblDesc.getDbName(),
- tblDesc.getTableName(), ALL_PARTS);
- }
-
- StorageDescriptor descTable = tbl.getSd();
- Properties props;
-
- if (partitions.isEmpty()) {
- props = getSchema(tbl);
- fetchMetaDataForSimpleTable(descTable, props);
- } else {
- List<FieldSchema> partitionKeys = tbl.getPartitionKeys();
-
- for (Partition partition : partitions) {
- StorageDescriptor descPartition = partition.getSd();
- props = MetaStoreUtils.getSchema(descPartition, descTable,
- null, // Map<string, string> parameters - can be empty
- tblDesc.getDbName(), tblDesc.getTableName(), // table
- // name
- partitionKeys);
- fetchMetaDataForPartitionedTable(descPartition, props,
- partition, partitionKeys, tblDesc.getTableName());
- }
- }
- }
-
- void verifySchema(Table tbl) throws Exception {
- /* nothing to verify here */
- }
-
- private static Properties getSchema(Table table) {
- return MetaStoreUtils.getSchema(table.getSd(), table.getSd(),
- table.getParameters(), table.getDbName(), table.getTableName(),
- table.getPartitionKeys());
- }
-
- private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc,
- Properties props) throws Exception {
- fetchMetaDataForSimpleTable(stdsc, props, null);
- }
-
- private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc,
- Properties props, String tableName)
- throws Exception {
- fetchMetaData(new HiveTablePartition(stdsc, props, null, null,
- tableName));
- }
-
- private void fetchMetaDataForPartitionedTable(StorageDescriptor stdsc,
- Properties props,
- Partition partition,
- List<FieldSchema> partitionKeys,
- String tableName)
- throws Exception {
- fetchMetaData(new HiveTablePartition(stdsc, props, partition,
- partitionKeys, tableName));
- }
-
- /* Fills a table partition */
- private void fetchMetaData(HiveTablePartition tablePartition)
- throws Exception {
- InputFormat<?, ?> fformat = makeInputFormat(
- tablePartition.storageDesc.getInputFormat(), jobConf);
- FileInputFormat.setInputPaths(jobConf, new Path(
- tablePartition.storageDesc.getLocation()));
-
- InputSplit[] splits = null;
- try {
- splits = fformat.getSplits(jobConf, 1);
- } catch (org.apache.hadoop.mapred.InvalidInputException e) {
- LOG.debug("getSplits failed on " + e.getMessage());
- return;
- }
-
- for (InputSplit split : splits) {
- FileSplit fsp = (FileSplit) split;
- String[] hosts = fsp.getLocations();
- String filepath = fsp.getPath().toUri().getPath();
-
- byte[] locationInfo = HdfsUtilities.prepareFragmentMetadata(fsp);
- Fragment fragment = new Fragment(filepath, hosts, locationInfo,
- makeUserData(tablePartition));
- fragments.add(fragment);
- }
- }
-
- /* Turns a Properties class into a string */
- private String serializeProperties(Properties props) throws Exception {
- ByteArrayOutputStream outStream = new ByteArrayOutputStream();
- props.store(outStream, ""/* comments */);
- return outStream.toString();
- }
-
- /* Turns the partition keys into a string */
- String serializePartitionKeys(HiveTablePartition partData) throws Exception {
- if (partData.partition == null) /*
- * this is a simple hive table - there
- * are no partitions
- */{
- return HIVE_NO_PART_TBL;
- }
-
- StringBuilder partitionKeys = new StringBuilder();
- String prefix = "";
- ListIterator<String> valsIter = partData.partition.getValues().listIterator();
- ListIterator<FieldSchema> keysIter = partData.partitionKeys.listIterator();
- while (valsIter.hasNext() && keysIter.hasNext()) {
- FieldSchema key = keysIter.next();
- String name = key.getName();
- String type = key.getType();
- String val = valsIter.next();
- String oneLevel = prefix + name + HIVE_1_PART_DELIM + type
- + HIVE_1_PART_DELIM + val;
- partitionKeys.append(oneLevel);
- prefix = HIVE_PARTITIONS_DELIM;
- }
-
- return partitionKeys.toString();
- }
-
- byte[] makeUserData(HiveTablePartition partData) throws Exception {
- String inputFormatName = partData.storageDesc.getInputFormat();
- String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
- String propertiesString = serializeProperties(partData.properties);
- String partitionKeys = serializePartitionKeys(partData);
- String userData = inputFormatName + HIVE_UD_DELIM + serdeName
- + HIVE_UD_DELIM + propertiesString + HIVE_UD_DELIM
- + partitionKeys + HIVE_UD_DELIM + filterInFragmenter;
-
- return userData.getBytes();
- }
-
- /*
- * Build filter string for HiveMetaStoreClient.listPartitionsByFilter API
- * method.
- *
- * The filter string parameter for
- * HiveMetaStoreClient.listPartitionsByFilter will be created from the
- * incoming getFragments filter string parameter. It will be in a format of:
- * [PARTITON1 NAME] = \"[PARTITON1 VALUE]\" AND [PARTITON2 NAME] =
- * \"[PARTITON2 VALUE]\" ... Filtering can be done only on string partition
- * keys and AND operators.
- *
- * For Example for query: SELECT * FROM TABLE1 WHERE part1 = 'AAAA' AND
- * part2 = '1111' For HIVE HiveMetaStoreClient.listPartitionsByFilter, the
- * incoming HAWQ filter string will be mapped into :
- * "part1 = \"AAAA\" and part2 = \"1111\""
- */
- private String buildFilterStringForHive() throws Exception {
-
- StringBuilder filtersString = new StringBuilder();
- String filterInput = inputData.getFilterString();
-
- if (LOG.isDebugEnabled()) {
-
- for (ColumnDescriptor cd : inputData.getTupleDescription()) {
- LOG.debug("ColumnDescriptor : " + cd);
- }
-
- LOG.debug("Filter string input : " + inputData.getFilterString());
- }
-
- HiveFilterBuilder eval = new HiveFilterBuilder(inputData);
- Object filter = eval.getFilterObject(filterInput);
-
- String prefix = "";
-
- if (filter instanceof List) {
-
- for (Object f : (List<?>) filter) {
- if (buildSingleFilter(f, filtersString, prefix)) {
- // Set 'and' operator between each matched partition filter.
- prefix = " and ";
- }
- }
-
- } else {
- buildSingleFilter(filter, filtersString, prefix);
- }
-
- return filtersString.toString();
- }
-
- /*
- * Build filter string for a single filter and append to the filters string.
- * Filter string shell be added if filter name match hive partition name
- * Single filter will be in a format of: [PARTITON NAME] = \"[PARTITON
- * VALUE]\"
- */
- private boolean buildSingleFilter(Object filter,
- StringBuilder filtersString, String prefix)
- throws Exception {
-
- // Let's look first at the filter
- FilterParser.BasicFilter bFilter = (FilterParser.BasicFilter) filter;
-
- // In case this is not an "equality filter", we ignore this filter (no
- // add to filter list)
- if (!(bFilter.getOperation() == FilterParser.Operation.HDOP_EQ)) {
- LOG.debug("Filter operator is not EQ, ignore this filter for hive : "
- + filter);
- return false;
- }
-
- // Extract column name and value
- int filterColumnIndex = bFilter.getColumn().index();
- String filterValue = bFilter.getConstant().constant().toString();
- ColumnDescriptor filterColumn = inputData.getColumn(filterColumnIndex);
- String filterColumnName = filterColumn.columnName();
-
- // In case this filter is not a partition, we ignore this filter (no add
- // to filter list)
- if (!setPartitions.contains(filterColumnName)) {
- LOG.debug("Filter name is not a partition , ignore this filter for hive: "
- + filter);
- return false;
- }
-
- filtersString.append(prefix);
- filtersString.append(filterColumnName);
- filtersString.append(HIVE_API_EQ);
- filtersString.append(HIVE_API_DQUOTE);
- filtersString.append(filterValue);
- filtersString.append(HIVE_API_DQUOTE);
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveFilterBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveFilterBuilder.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveFilterBuilder.java
deleted file mode 100644
index 1fe3a64..0000000
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveFilterBuilder.java
+++ /dev/null
@@ -1,129 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import com.pivotal.pxf.api.FilterParser;
-import com.pivotal.pxf.api.utilities.InputData;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Uses the filter parser code to build a filter object, either simple - a
- * single {@link com.pivotal.pxf.api.FilterParser.BasicFilter} object or a
- * compound - a {@link java.util.List} of
- * {@link com.pivotal.pxf.api.FilterParser.BasicFilter} objects.
- * {@link com.pivotal.pxf.plugins.hive.HiveAccessor} will use the filter for
- * partition filtering.
- */
-public class HiveFilterBuilder implements FilterParser.FilterBuilder {
- private InputData inputData;
-
- /**
- * Constructs a HiveFilterBuilder object.
- *
- * @param input input data containing filter string
- */
- public HiveFilterBuilder(InputData input) {
- inputData = input;
- }
-
- /**
- * Translates a filterString into a {@link com.pivotal.pxf.api.FilterParser.BasicFilter} or a
- * list of such filters.
- *
- * @param filterString the string representation of the filter
- * @return a single {@link com.pivotal.pxf.api.FilterParser.BasicFilter}
- * object or a {@link java.util.List} of
- * {@link com.pivotal.pxf.api.FilterParser.BasicFilter} objects.
- * @throws Exception if parsing the filter failed or filter is not a basic
- * filter or list of basic filters
- */
- public Object getFilterObject(String filterString) throws Exception {
- FilterParser parser = new FilterParser(this);
- Object result = parser.parse(filterString);
-
- if (!(result instanceof FilterParser.BasicFilter)
- && !(result instanceof List)) {
- throw new Exception("String " + filterString
- + " resolved to no filter");
- }
-
- return result;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Object build(FilterParser.Operation opId, Object leftOperand,
- Object rightOperand) throws Exception {
- if (leftOperand instanceof FilterParser.BasicFilter
- || leftOperand instanceof List) {
- if (opId != FilterParser.Operation.HDOP_AND
- || !(rightOperand instanceof FilterParser.BasicFilter)) {
- throw new Exception(
- "Only AND is allowed between compound expressions");
- }
-
- if (leftOperand instanceof List) {
- return handleCompoundOperations(
- (List<FilterParser.BasicFilter>) leftOperand,
- (FilterParser.BasicFilter) rightOperand);
- } else {
- return handleCompoundOperations(
- (FilterParser.BasicFilter) leftOperand,
- (FilterParser.BasicFilter) rightOperand);
- }
- }
-
- if (!(rightOperand instanceof FilterParser.Constant)) {
- throw new Exception(
- "expressions of column-op-column are not supported");
- }
-
- // Assume column is on the left
- return handleSimpleOperations(opId,
- (FilterParser.ColumnIndex) leftOperand,
- (FilterParser.Constant) rightOperand);
- }
-
- /*
- * Handles simple column-operator-constant expressions Creates a special
- * filter in the case the column is the row key column
- */
- private FilterParser.BasicFilter handleSimpleOperations(FilterParser.Operation opId,
- FilterParser.ColumnIndex column,
- FilterParser.Constant constant) {
- return new FilterParser.BasicFilter(opId, column, constant);
- }
-
- /**
- * Handles AND of already calculated expressions. Currently only AND, in the
- * future OR can be added
- *
- * Four cases here:
- * <ol>
- * <li>both are simple filters</li>
- * <li>left is a FilterList and right is a filter</li>
- * <li>left is a filter and right is a FilterList</li>
- * <li>both are FilterLists</li>
- * </ol>
- * Currently, 1, 2 can occur, since no parenthesis are used
- *
- * @param left left hand filter
- * @param right right hand filter
- * @return list of filters constructing the filter tree
- */
- private List<FilterParser.BasicFilter> handleCompoundOperations(List<FilterParser.BasicFilter> left,
- FilterParser.BasicFilter right) {
- left.add(right);
- return left;
- }
-
- private List<FilterParser.BasicFilter> handleCompoundOperations(FilterParser.BasicFilter left,
- FilterParser.BasicFilter right) {
- List<FilterParser.BasicFilter> result = new LinkedList<FilterParser.BasicFilter>();
-
- result.add(left);
- result.add(right);
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveInputFormatFragmenter.java
deleted file mode 100644
index 5c51e93..0000000
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveInputFormatFragmenter.java
+++ /dev/null
@@ -1,262 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import com.pivotal.pxf.api.UnsupportedTypeException;
-import com.pivotal.pxf.api.UserDataException;
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.utilities.ColumnDescriptor;
-import com.pivotal.pxf.api.utilities.InputData;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Specialized Hive fragmenter for RC and Text files tables. Unlike the
- * {@link HiveDataFragmenter}, this class does not send the serde properties to
- * the accessor/resolvers. This is done to avoid memory explosion in Hawq. For
- * RC use together with {@link HiveRCFileAccessor}/
- * {@link HiveColumnarSerdeResolver}. For Text use together with
- * {@link HiveLineBreakAccessor}/{@link HiveStringPassResolver}. <br>
- * Given a Hive table and its partitions, divide the data into fragments (here a
- * data fragment is actually a HDFS file block) and return a list of them. Each
- * data fragment will contain the following information:
- * <ol>
- * <li>sourceName: full HDFS path to the data file that this data fragment is
- * part of</li>
- * <li>hosts: a list of the datanode machines that hold a replica of this block</li>
- * <li>userData: inputformat name, serde names and partition keys</li>
- * </ol>
- */
-public class HiveInputFormatFragmenter extends HiveDataFragmenter {
- private static final Log LOG = LogFactory.getLog(HiveInputFormatFragmenter.class);
-
- static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
- static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
- static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
- static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe";
- static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
- private static final int EXPECTED_NUM_OF_TOKS = 3;
- public static final int TOK_SERDE = 0;
- public static final int TOK_KEYS = 1;
- public static final int TOK_FILTER_DONE = 2;
-
- /** Defines the Hive input formats currently supported in pxf */
- public enum PXF_HIVE_INPUT_FORMATS {
- RC_FILE_INPUT_FORMAT,
- TEXT_FILE_INPUT_FORMAT
- }
-
- /** Defines the Hive serializers (serde classes) currently supported in pxf */
- public enum PXF_HIVE_SERDES {
- COLUMNAR_SERDE,
- LAZY_BINARY_COLUMNAR_SERDE,
- LAZY_SIMPLE_SERDE
- }
-
- /**
- * Constructs a HiveInputFormatFragmenter.
- *
- * @param inputData all input parameters coming from the client
- */
- public HiveInputFormatFragmenter(InputData inputData) {
- super(inputData, HiveInputFormatFragmenter.class);
- }
-
- /**
- * Extracts the user data:
- * serde, partition keys and whether filter was included in fragmenter
- *
- * @param input input data from client
- * @param supportedSerdes supported serde names
- * @return parsed tokens
- * @throws UserDataException if user data contains unsupported serde
- * or wrong number of tokens
- */
- static public String[] parseToks(InputData input, String... supportedSerdes)
- throws UserDataException {
- String userData = new String(input.getFragmentUserData());
- String[] toks = userData.split(HIVE_UD_DELIM);
- if (supportedSerdes.length > 0
- && !Arrays.asList(supportedSerdes).contains(toks[TOK_SERDE])) {
- throw new UserDataException(toks[TOK_SERDE]
- + " serializer isn't supported by " + input.getAccessor());
- }
-
- if (toks.length != (EXPECTED_NUM_OF_TOKS)) {
- throw new UserDataException("HiveInputFormatFragmenter expected "
- + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
- }
-
- return toks;
- }
-
- /*
- * Checks that hive fields and partitions match the HAWQ schema. Throws an
- * exception if: - the number of fields (+ partitions) do not match the HAWQ
- * table definition. - the hive fields types do not match the HAWQ fields.
- */
- @Override
- void verifySchema(Table tbl) throws Exception {
-
- int columnsSize = inputData.getColumns();
- int hiveColumnsSize = tbl.getSd().getColsSize();
- int hivePartitionsSize = tbl.getPartitionKeysSize();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hive table: " + hiveColumnsSize + " fields, "
- + hivePartitionsSize + " partitions. " + "HAWQ table: "
- + columnsSize + " fields.");
- }
-
- // check schema size
- if (columnsSize != (hiveColumnsSize + hivePartitionsSize)) {
- throw new IllegalArgumentException("Hive table schema ("
- + hiveColumnsSize + " fields, " + hivePartitionsSize
- + " partitions) " + "doesn't match PXF table ("
- + columnsSize + " fields)");
- }
-
- int index = 0;
- // check hive fields
- List<FieldSchema> hiveColumns = tbl.getSd().getCols();
- for (FieldSchema hiveCol : hiveColumns) {
- ColumnDescriptor colDesc = inputData.getColumn(index++);
- DataType colType = DataType.get(colDesc.columnTypeCode());
- compareTypes(colType, hiveCol.getType(), colDesc.columnName());
- }
- // check partition fields
- List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
- for (FieldSchema hivePart : hivePartitions) {
- ColumnDescriptor colDesc = inputData.getColumn(index++);
- DataType colType = DataType.get(colDesc.columnTypeCode());
- compareTypes(colType, hivePart.getType(), colDesc.columnName());
- }
-
- }
-
- private void compareTypes(DataType type, String hiveType, String fieldName) {
- String convertedHive = toHiveType(type, fieldName);
- if (!convertedHive.equals(hiveType)
- && !(convertedHive.equals("smallint") && hiveType.equals("tinyint"))) {
- throw new UnsupportedTypeException(
- "Schema mismatch definition: Field " + fieldName
- + " (Hive type " + hiveType + ", HAWQ type "
- + type.toString() + ")");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Field " + fieldName + ": Hive type " + hiveType
- + ", HAWQ type " + type.toString());
- }
- }
-
- /**
- * Converts HAWQ type to hive type. The supported mappings are:<ul>
- * <li>{@code BOOLEAN -> boolean}</li>
- * <li>{@code SMALLINT -> smallint (tinyint is converted to smallint)}</li>
- * <li>{@code BIGINT -> bigint}</li>
- * <li>{@code TIMESTAMP, TIME -> timestamp}</li>
- * <li>{@code NUMERIC -> decimal}</li>
- * <li>{@code BYTEA -> binary}</li>
- * <li>{@code INTERGER -> int}</li>
- * <li>{@code TEXT -> string}</li>
- * <li>{@code REAL -> float}</li>
- * <li>{@code FLOAT8 -> double}</li>
- * </ul>
- * All other types (both in HAWQ and in HIVE) are not supported.
- *
- * @param type HAWQ data type
- * @param name field name
- * @return Hive type
- * @throws UnsupportedTypeException if type is not supported
- */
- public static String toHiveType(DataType type, String name) {
- switch (type) {
- case BOOLEAN:
- case SMALLINT:
- case BIGINT:
- case TIMESTAMP:
- return type.toString().toLowerCase();
- case NUMERIC:
- return "decimal";
- case BYTEA:
- return "binary";
- case INTEGER:
- return "int";
- case TEXT:
- return "string";
- case REAL:
- return "float";
- case FLOAT8:
- return "double";
- case TIME:
- return "timestamp";
- default:
- throw new UnsupportedTypeException(
- type.toString()
- + " conversion is not supported by HiveInputFormatFragmenter (Field "
- + name + ")");
- }
- }
-
- /*
- * Validates that partition format corresponds to PXF supported formats and
- * transforms the class name to an enumeration for writing it to the
- * accessors on other PXF instances.
- */
- private String assertFileType(String className, HiveTablePartition partData)
- throws Exception {
- switch (className) {
- case STR_RC_FILE_INPUT_FORMAT:
- return PXF_HIVE_INPUT_FORMATS.RC_FILE_INPUT_FORMAT.name();
- case STR_TEXT_FILE_INPUT_FORMAT:
- return PXF_HIVE_INPUT_FORMATS.TEXT_FILE_INPUT_FORMAT.name();
- default:
- throw new IllegalArgumentException(
- "HiveInputFormatFragmenter does not yet support "
- + className
- + " for "
- + partData
- + ". Supported InputFormat are "
- + Arrays.toString(PXF_HIVE_INPUT_FORMATS.values()));
- }
- }
-
- /*
- * Validates that partition serde corresponds to PXF supported serdes and
- * transforms the class name to an enumeration for writing it to the
- * resolvers on other PXF instances.
- */
- private String assertSerde(String className, HiveTablePartition partData)
- throws Exception {
- switch (className) {
- case STR_COLUMNAR_SERDE:
- return PXF_HIVE_SERDES.COLUMNAR_SERDE.name();
- case STR_LAZY_BINARY_COLUMNAR_SERDE:
- return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name();
- case STR_LAZY_SIMPLE_SERDE:
- return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name();
- default:
- throw new UnsupportedTypeException(
- "HiveInputFormatFragmenter does not yet support "
- + className + " for " + partData
- + ". Supported serializers are: "
- + Arrays.toString(PXF_HIVE_SERDES.values()));
- }
- }
-
- @Override
- byte[] makeUserData(HiveTablePartition partData) throws Exception {
- String inputFormatName = partData.storageDesc.getInputFormat();
- String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
- String partitionKeys = serializePartitionKeys(partData);
-
- assertFileType(inputFormatName, partData);
- String userData = assertSerde(serdeName, partData) + HIVE_UD_DELIM
- + partitionKeys + HIVE_UD_DELIM + filterInFragmenter;
-
- return userData.getBytes();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveLineBreakAccessor.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveLineBreakAccessor.java
deleted file mode 100644
index b293123..0000000
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveLineBreakAccessor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import com.pivotal.pxf.api.utilities.InputData;
-
-import org.apache.hadoop.mapred.*;
-
-import java.io.IOException;
-
-import static com.pivotal.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
-
-/**
- * Specialization of HiveAccessor for a Hive table stored as Text files.
- * Use together with {@link HiveInputFormatFragmenter}/{@link HiveStringPassResolver}.
- */
-public class HiveLineBreakAccessor extends HiveAccessor {
-
- /**
- * Constructs a HiveLineBreakAccessor.
- *
- * @param input input containing user data
- * @throws Exception if user data was wrong
- */
- public HiveLineBreakAccessor(InputData input) throws Exception {
- super(input, new TextInputFormat());
- ((TextInputFormat) inputFormat).configure(jobConf);
- String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name());
- initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
- filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
- }
-
- @Override
- protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
- return new LineRecordReader(jobConf, (FileSplit) split);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveMetadataFetcher.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveMetadataFetcher.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveMetadataFetcher.java
deleted file mode 100644
index d68df09..0000000
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveMetadataFetcher.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import com.pivotal.pxf.api.Metadata;
-import com.pivotal.pxf.api.MetadataFetcher;
-import com.pivotal.pxf.api.UnsupportedTypeException;
-import com.pivotal.pxf.plugins.hive.utilities.HiveUtilities;
-
-/**
- * Class for connecting to Hive's MetaStore and getting schema of Hive tables.
- */
-public class HiveMetadataFetcher extends MetadataFetcher {
-
- private static final Log LOG = LogFactory.getLog(HiveMetadataFetcher.class);
- private HiveMetaStoreClient client;
-
- public HiveMetadataFetcher() {
- super();
-
- // init hive metastore client connection.
- client = HiveUtilities.initHiveClient();
- }
-
- @Override
- public Metadata getTableMetadata(String tableName) throws Exception {
-
- Metadata.Table tblDesc = HiveUtilities.parseTableQualifiedName(tableName);
- Metadata metadata = new Metadata(tblDesc);
-
- Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
-
- getSchema(tbl, metadata);
-
- return metadata;
- }
-
-
- /**
- * Populates the given metadata object with the given table's fields and partitions,
- * The partition fields are added at the end of the table schema.
- * Throws an exception if the table contains unsupported field types.
- *
- * @param tbl Hive table
- * @param metadata schema of given table
- */
- private void getSchema(Table tbl, Metadata metadata) {
-
- int hiveColumnsSize = tbl.getSd().getColsSize();
- int hivePartitionsSize = tbl.getPartitionKeysSize();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hive table: " + hiveColumnsSize + " fields, " + hivePartitionsSize + " partitions.");
- }
-
- // check hive fields
- try {
- List<FieldSchema> hiveColumns = tbl.getSd().getCols();
- for (FieldSchema hiveCol : hiveColumns) {
- metadata.addField(HiveUtilities.mapHiveType(hiveCol));
- }
- // check partition fields
- List<FieldSchema> hivePartitions = tbl.getPartitionKeys();
- for (FieldSchema hivePart : hivePartitions) {
- metadata.addField(HiveUtilities.mapHiveType(hivePart));
- }
- } catch (UnsupportedTypeException e) {
- String errorMsg = "Failed to retrieve metadata for table " + metadata.getTable() + ". " +
- e.getMessage();
- throw new UnsupportedTypeException(errorMsg);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c7ab9eb/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveRCFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveRCFileAccessor.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveRCFileAccessor.java
deleted file mode 100644
index 6e64296..0000000
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveRCFileAccessor.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import com.pivotal.pxf.api.utilities.InputData;
-
-import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
-import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.io.IOException;
-
-import static com.pivotal.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
-
-/**
- * Specialization of HiveAccessor for a Hive table that stores only RC files.
- * This class replaces the generic HiveAccessor for a case where a table is stored entirely as RC files.
- * Use together with {@link HiveInputFormatFragmenter}/{@link HiveColumnarSerdeResolver}
- */
-public class HiveRCFileAccessor extends HiveAccessor {
-
- /**
- * Constructs a HiveRCFileAccessor.
- *
- * @param input input containing user data
- * @throws Exception if user data was wrong
- */
- public HiveRCFileAccessor(InputData input) throws Exception {
- super(input, new RCFileInputFormat());
- String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name());
- initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
- filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
- }
-
- @Override
- protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
- return new RCFileRecordReader(jobConf, (FileSplit) split);
- }
-}