You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2016/09/08 20:56:06 UTC
[4/4] incubator-hawq git commit: HAWQ-931. ORC optimized profile for
PPD/CP
HAWQ-931. ORC optimized profile for PPD/CP
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/98a302da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/98a302da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/98a302da
Branch: refs/heads/HAWQ-931
Commit: 98a302da09db7952d782809c784c7a820ee579ef
Parents: 8906240
Author: Shivram Mani <sh...@gmail.com>
Authored: Wed Jul 27 17:48:59 2016 -0700
Committer: Shivram Mani <sh...@gmail.com>
Committed: Tue Aug 2 15:12:58 2016 -0700
----------------------------------------------------------------------
pxf/build.gradle | 1 +
pxf/gradle.properties | 3 +-
.../pxf/api/utilities/ColumnDescriptor.java | 28 +-
.../plugins/hive/HiveColumnarSerdeResolver.java | 7 +-
.../plugins/hive/HiveInputFormatFragmenter.java | 16 +-
.../hawq/pxf/plugins/hive/HiveORCAccessor.java | 170 +++++++
.../pxf/plugins/hive/HiveORCSerdeResolver.java | 439 +++++++++++++++++++
.../hawq/pxf/service/rest/RestResource.java | 8 +-
.../pxf/service/utilities/ProtocolData.java | 30 +-
.../src/main/resources/pxf-profiles-default.xml | 14 +
10 files changed, 707 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/build.gradle
----------------------------------------------------------------------
diff --git a/pxf/build.gradle b/pxf/build.gradle
index 23d688f..cd29c01 100644
--- a/pxf/build.gradle
+++ b/pxf/build.gradle
@@ -314,6 +314,7 @@ project('pxf-hive') {
compile "org.apache.hive:hive-metastore:$hiveVersion"
compile "org.apache.hive:hive-common:$hiveVersion"
compile "org.apache.hive:hive-serde:$hiveVersion"
+ compile "org.apache.orc:orc-core:$orcVersion"
testCompile 'pl.pragmatists:JUnitParams:1.0.2'
configurations {
// Remove hive-exec from unit tests as it causes VerifyError
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/gradle.properties
----------------------------------------------------------------------
diff --git a/pxf/gradle.properties b/pxf/gradle.properties
index 6827b89..a601936 100644
--- a/pxf/gradle.properties
+++ b/pxf/gradle.properties
@@ -23,4 +23,5 @@ hiveVersion=1.2.1
hbaseVersionJar=1.1.2
hbaseVersionRPM=1.1.2
tomcatVersion=7.0.62
-pxfProtocolVersion=v14
\ No newline at end of file
+pxfProtocolVersion=v14
+orcVersion=1.1.1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java
index baaca1d..4b9dc9c 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java
@@ -30,6 +30,7 @@ public class ColumnDescriptor {
String gpdbColumnName;
String gpdbColumnTypeName;
int gpdbColumnIndex;
+ boolean isProjected;
/**
* Reserved word for a table record key.
@@ -50,6 +51,21 @@ public class ColumnDescriptor {
gpdbColumnTypeName = typename;
gpdbColumnName = name;
gpdbColumnIndex = index;
+ isProjected = true;
+ }
+
+ /**
+ * Constructs a ColumnDescriptor.
+ *
+ * @param name column name
+ * @param typecode OID
+ * @param index column index
+ * @param typename type name
+ * @param isProj type boolean
+ */
+ public ColumnDescriptor(String name, int typecode, int index, String typename, boolean isProj) {
+ this(name, typecode, index, typename);
+ isProjected = isProj;
}
/**
@@ -62,6 +78,7 @@ public class ColumnDescriptor {
this.gpdbColumnName = copy.gpdbColumnName;
this.gpdbColumnIndex = copy.gpdbColumnIndex;
this.gpdbColumnTypeName = copy.gpdbColumnTypeName;
+ this.isProjected = copy.isProjected;
}
public String columnName() {
@@ -89,11 +106,20 @@ public class ColumnDescriptor {
return RECORD_KEY_NAME.equalsIgnoreCase(gpdbColumnName);
}
+ public boolean isProjected() {
+ return isProjected;
+ }
+
+ public void setProjected(boolean projected) {
+ isProjected = projected;
+ }
+
@Override
public String toString() {
return "ColumnDescriptor [gpdbColumnTypeCode=" + gpdbColumnTypeCode
+ ", gpdbColumnName=" + gpdbColumnName
+ ", gpdbColumnTypeName=" + gpdbColumnTypeName
- + ", gpdbColumnIndex=" + gpdbColumnIndex + "]";
+ + ", gpdbColumnIndex=" + gpdbColumnIndex
+ + ", isProjected=" + isProjected + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
index d298bac..497ee2e 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -19,7 +19,6 @@ package org.apache.hawq.pxf.plugins.hive;
* under the License.
*/
-
import org.apache.hawq.pxf.api.BadRecordException;
import org.apache.hawq.pxf.api.OneField;
import org.apache.hawq.pxf.api.OneRow;
@@ -31,6 +30,7 @@ import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedColumnarSerDe;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDeBase;
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
@@ -76,6 +77,8 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE;
} else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name())) {
serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE;
+ } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE.name())) {
+ serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE;
} else {
throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
}
@@ -138,6 +141,8 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
deserializer = new ColumnarSerDe();
} else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) {
deserializer = new LazyBinaryColumnarSerDe();
+ } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE) {
+ deserializer = new VectorizedColumnarSerDe();
} else {
throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
index a666b8b..955aa9a 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
@@ -56,9 +56,12 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
+ static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe";
static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+ static final String STR_VECTORIZED_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.VectorizedOrcSerde";
+ static final String STR_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
private static final int EXPECTED_NUM_OF_TOKS = 3;
public static final int TOK_SERDE = 0;
public static final int TOK_KEYS = 1;
@@ -67,14 +70,17 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
/** Defines the Hive input formats currently supported in pxf */
public enum PXF_HIVE_INPUT_FORMATS {
RC_FILE_INPUT_FORMAT,
- TEXT_FILE_INPUT_FORMAT
+ TEXT_FILE_INPUT_FORMAT,
+ ORC_FILE_INPUT_FORMAT
}
/** Defines the Hive serializers (serde classes) currently supported in pxf */
public enum PXF_HIVE_SERDES {
COLUMNAR_SERDE,
LAZY_BINARY_COLUMNAR_SERDE,
- LAZY_SIMPLE_SERDE
+ LAZY_SIMPLE_SERDE,
+ ORC_SERDE,
+ VECTORIZED_ORC_SERDE
}
/**
@@ -234,6 +240,8 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
return PXF_HIVE_INPUT_FORMATS.RC_FILE_INPUT_FORMAT.name();
case STR_TEXT_FILE_INPUT_FORMAT:
return PXF_HIVE_INPUT_FORMATS.TEXT_FILE_INPUT_FORMAT.name();
+ case STR_ORC_FILE_INPUT_FORMAT:
+ return PXF_HIVE_INPUT_FORMATS.ORC_FILE_INPUT_FORMAT.name();
default:
throw new IllegalArgumentException(
"HiveInputFormatFragmenter does not yet support "
@@ -259,6 +267,10 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name();
case STR_LAZY_SIMPLE_SERDE:
return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name();
+ case STR_ORC_SERDE:
+ return PXF_HIVE_SERDES.ORC_SERDE.name();
+ case STR_VECTORIZED_ORC_SERDE:
+ return PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE.name();
default:
throw new UnsupportedTypeException(
"HiveInputFormatFragmenter does not yet support "
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
new file mode 100644
index 0000000..43c48b2
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -0,0 +1,170 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hawq.pxf.api.FilterParser;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.commons.lang.StringUtils;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+
+/**
+ * Specialization of HiveAccessor for a Hive table that stores only ORC files.
+ * This class replaces the generic HiveAccessor for a case where a table is stored entirely as ORC files.
+ * Use together with {@link HiveInputFormatFragmenter}/{@link HiveColumnarSerdeResolver}
+ */
+public class HiveORCAccessor extends HiveAccessor {
+
+ private RecordReader batchReader = null;
+ private Reader reader = null;
+ private VectorizedRowBatch batch = null;
+
+ private final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids";
+ private final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
+ private final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names";
+ private final String SARG_PUSHDOWN = "sarg.pushdown";
+
+ /**
+ * Constructs a HiveRCFileAccessor.
+ *
+ * @param input input containing user data
+ * @throws Exception if user data was wrong
+ */
+ public HiveORCAccessor(InputData input) throws Exception {
+ super(input, new OrcInputFormat());
+ String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.ORC_SERDE.name(), PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE.name());
+ initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
+ filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+ }
+
+ @Override
+ public boolean openForRead() throws Exception {
+ addColumns();
+ addFilters();
+ return super.openForRead();
+ }
+
+ @Override
+ protected Object getReader(JobConf jobConf, InputSplit split)
+ throws IOException {
+ return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+ }
+
+ /**
+ * Adds the table tuple description to JobConf ojbect
+ * so only these columns will be returned.
+ */
+ private void addColumns() throws Exception {
+
+ List<String> colIds = new ArrayList<String>();
+ List<String> colNames = new ArrayList<String>();
+ for(ColumnDescriptor col: inputData.getTupleDescription()) {
+ if(col.isProjected()) {
+ colIds.add(String.valueOf(col.columnIndex()));
+ colNames.add(col.columnName());
+ }
+ }
+ jobConf.set(READ_ALL_COLUMNS, "false");
+ jobConf.set(READ_COLUMN_IDS_CONF_STR, StringUtils.join(colIds, ","));
+ jobConf.set(READ_COLUMN_NAMES_CONF_STR, StringUtils.join(colNames, ","));
+ }
+
+ /**
+ * Uses {@link HiveFilterBuilder} to translate a filter string into a
+ * Hive {@link SearchArgument} object. The result is added as a filter to
+ * JobConf object
+ */
+ private void addFilters() throws Exception {
+ if (!inputData.hasFilter()) {
+ return;
+ }
+
+ /* Predicate pushdown configuration */
+ String filterStr = inputData.getFilterString();
+ HiveFilterBuilder eval = new HiveFilterBuilder(inputData);
+ Object filter = eval.getFilterObject(filterStr);
+
+ SearchArgument.Builder filterBuilder = SearchArgumentFactory.newBuilder();
+ filterBuilder.startAnd();
+ if (filter instanceof List) {
+ for (Object f : (List<?>) filter) {
+ buildArgument(filterBuilder, f);
+ }
+ } else {
+ buildArgument(filterBuilder, filter);
+ }
+ filterBuilder.end();
+ SearchArgument sarg = filterBuilder.build();
+ jobConf.set(SARG_PUSHDOWN, sarg.toKryo());
+ }
+
+ private void buildArgument(SearchArgument.Builder builder, Object filterObj) {
+ /* The below functions will not be compatible and requires update with Hive 2.0 APIs */
+ FilterParser.BasicFilter filter = (FilterParser.BasicFilter) filterObj;
+ int filterColumnIndex = filter.getColumn().index();
+ Object filterValue = filter.getConstant().constant();
+ ColumnDescriptor filterColumn = inputData.getColumn(filterColumnIndex);
+ String filterColumnName = filterColumn.columnName();
+
+ switch(filter.getOperation()) {
+ case HDOP_LT:
+ builder.lessThan(filterColumnName, filterValue);
+ break;
+ case HDOP_GT:
+ builder.startNot().lessThanEquals(filterColumnName, filterValue).end();
+ break;
+ case HDOP_LE:
+ builder.lessThanEquals(filterColumnName, filterValue);
+ break;
+ case HDOP_GE:
+ builder.startNot().lessThanEquals(filterColumnName, filterValue).end();
+ break;
+ case HDOP_EQ:
+ builder.equals(filterColumnName, filterValue);
+ break;
+ case HDOP_NE:
+ builder.startNot().equals(filterColumnName, filterValue).end();
+ break;
+ case HDOP_LIKE:
+ break;
+ }
+ return;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
new file mode 100644
index 0000000..6ac4e70
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
@@ -0,0 +1,439 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.commons.lang.CharUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.*;
+
+import static org.apache.hawq.pxf.api.io.DataType.*;
+import static org.apache.hawq.pxf.api.io.DataType.DATE;
+import static org.apache.hawq.pxf.api.io.DataType.SMALLINT;
+
+/**
+ * Specialized HiveResolver for a Hive table stored as RC file.
+ * Use together with HiveInputFormatFragmenter/HiveRCFileAccessor.
+ */
+public class HiveORCSerdeResolver extends HiveResolver {
+ private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class);
+ private OrcSerde deserializer;
+ private boolean firstColumn;
+ private StringBuilder builder;
+ private StringBuilder parts;
+ private int numberOfPartitions;
+ private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
+ private static final String MAPKEY_DELIM = ":";
+ private static final String COLLECTION_DELIM = ",";
+ private String collectionDelim;
+ private String mapkeyDelim;
+
+ public HiveORCSerdeResolver(InputData input) throws Exception {
+ super(input);
+ }
+
+ /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
+ @Override
+ void parseUserData(InputData input) throws Exception {
+ String[] toks = HiveInputFormatFragmenter.parseToks(input);
+ String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
+ if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE.name())) {
+ serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE;
+ } else {
+ throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
+ }
+ parts = new StringBuilder();
+ partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+ parseDelimiterChar(input);
+ collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
+ : input.getUserProperty("COLLECTION_DELIM");
+ mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
+ : input.getUserProperty("MAPKEY_DELIM");
+ }
+
+ @Override
+ void initPartitionFields() {
+ numberOfPartitions = initPartitionFields(parts);
+ }
+
+ /**
+ * getFields returns a singleton list of OneField item.
+ * OneField item contains two fields: an integer representing the VARCHAR type and a Java
+ * Object representing the field value.
+ */
+ @Override
+ public List<OneField> getFields(OneRow onerow) throws Exception {
+
+ Object tuple = deserializer.deserialize((Writable) onerow.getData());
+ // Each Hive record is a Struct
+ StructObjectInspector soi = (StructObjectInspector) deserializer.getObjectInspector();
+ List<OneField> record = traverseStruct(tuple, soi, false);
+
+ return record;
+
+ }
+
+ /*
+ * Get and init the deserializer for the records of this Hive data fragment.
+ * Suppress Warnings added because deserializer.initialize is an abstract function that is deprecated
+ * but its implementations (ColumnarSerDe, LazyBinaryColumnarSerDe) still use the deprecated interface.
+ */
+ @SuppressWarnings("deprecation")
+ @Override
+ void initSerde(InputData input) throws Exception {
+ Properties serdeProperties = new Properties();
+ int numberOfDataColumns = input.getColumns() - numberOfPartitions;
+
+ LOG.debug("Serde number of columns is " + numberOfDataColumns);
+
+ StringBuilder columnNames = new StringBuilder(numberOfDataColumns * 2); // column + delimiter
+ StringBuilder columnTypes = new StringBuilder(numberOfDataColumns * 2); // column + delimiter
+ String delim = "";
+ for (int i = 0; i < numberOfDataColumns; i++) {
+ ColumnDescriptor column = input.getColumn(i);
+ String columnName = column.columnName();
+ String columnType = HiveInputFormatFragmenter.toHiveType(DataType.get(column.columnTypeCode()), columnName);
+ columnNames.append(delim).append(columnName);
+ columnTypes.append(delim).append(columnType);
+ delim = ",";
+ }
+ serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
+ serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
+
+ if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE) {
+ deserializer = new OrcSerde();
+ } else {
+ throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
+ }
+
+ deserializer.initialize(new JobConf(new Configuration(), HiveORCSerdeResolver.class), serdeProperties);
+ }
+
+ /*
+ * If the object representing the whole record is null or if an object
+ * representing a composite sub-object (map, list,..) is null - then
+ * BadRecordException will be thrown. If a primitive field value is null,
+ * then a null will appear for the field in the record in the query result.
+ */
+ private void traverseTuple(Object obj, ObjectInspector objInspector,
+ List<OneField> record, boolean toFlatten)
+ throws IOException, BadRecordException {
+ ObjectInspector.Category category = objInspector.getCategory();
+ if ((obj == null) && (category != ObjectInspector.Category.PRIMITIVE)) {
+ throw new BadRecordException("NULL Hive composite object");
+ }
+ switch (category) {
+ case PRIMITIVE:
+ resolvePrimitive(obj, (PrimitiveObjectInspector) objInspector,
+ record, toFlatten);
+ break;
+ case LIST:
+ List<OneField> listRecord = traverseList(obj,
+ (ListObjectInspector) objInspector);
+ addOneFieldToRecord(record, TEXT, String.format("[%s]",
+ HdfsUtilities.toString(listRecord, collectionDelim)));
+ break;
+ case MAP:
+ List<OneField> mapRecord = traverseMap(obj,
+ (MapObjectInspector) objInspector);
+ addOneFieldToRecord(record, TEXT, String.format("{%s}",
+ HdfsUtilities.toString(mapRecord, collectionDelim)));
+ break;
+ case STRUCT:
+ List<OneField> structRecord = traverseStruct(obj,
+ (StructObjectInspector) objInspector, true);
+ addOneFieldToRecord(record, TEXT, String.format("{%s}",
+ HdfsUtilities.toString(structRecord, collectionDelim)));
+ break;
+ case UNION:
+ List<OneField> unionRecord = traverseUnion(obj,
+ (UnionObjectInspector) objInspector);
+ addOneFieldToRecord(record, TEXT, String.format("[%s]",
+ HdfsUtilities.toString(unionRecord, collectionDelim)));
+ break;
+ default:
+ throw new UnsupportedTypeException("Unknown category type: "
+ + objInspector.getCategory());
+ }
+ }
+
+ private List<OneField> traverseUnion(Object obj, UnionObjectInspector uoi)
+ throws BadRecordException, IOException {
+ List<OneField> unionRecord = new LinkedList<>();
+ List<? extends ObjectInspector> ois = uoi.getObjectInspectors();
+ if (ois == null) {
+ throw new BadRecordException(
+ "Illegal value NULL for Hive data type Union");
+ }
+ traverseTuple(uoi.getField(obj), ois.get(uoi.getTag(obj)), unionRecord,
+ true);
+ return unionRecord;
+ }
+
+ private List<OneField> traverseList(Object obj, ListObjectInspector loi)
+ throws BadRecordException, IOException {
+ List<OneField> listRecord = new LinkedList<>();
+ List<?> list = loi.getList(obj);
+ ObjectInspector eoi = loi.getListElementObjectInspector();
+ if (list == null) {
+ throw new BadRecordException(
+ "Illegal value NULL for Hive data type List");
+ }
+ for (Object object : list) {
+ traverseTuple(object, eoi, listRecord, true);
+ }
+ return listRecord;
+ }
+
+ private List<OneField> traverseStruct(Object struct,
+ StructObjectInspector soi,
+ boolean toFlatten)
+ throws BadRecordException, IOException {
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ List<Object> structFields = soi.getStructFieldsDataAsList(struct);
+ if (structFields == null) {
+ throw new BadRecordException(
+ "Illegal value NULL for Hive data type Struct");
+ }
+ List<OneField> structRecord = new LinkedList<>();
+ List<OneField> complexRecord = new LinkedList<>();
+ for (int i = 0; i < structFields.size(); i++) {
+ if (toFlatten) {
+ complexRecord.add(new OneField(TEXT.getOID(), String.format(
+ "\"%s\"", fields.get(i).getFieldName())));
+ }
+ traverseTuple(structFields.get(i),
+ fields.get(i).getFieldObjectInspector(), complexRecord,
+ toFlatten);
+ if (toFlatten) {
+ addOneFieldToRecord(structRecord, TEXT,
+ HdfsUtilities.toString(complexRecord, mapkeyDelim));
+ complexRecord.clear();
+ }
+ }
+ return toFlatten ? structRecord : complexRecord;
+ }
+
+ private List<OneField> traverseMap(Object obj, MapObjectInspector moi)
+ throws BadRecordException, IOException {
+ List<OneField> complexRecord = new LinkedList<>();
+ List<OneField> mapRecord = new LinkedList<>();
+ ObjectInspector koi = moi.getMapKeyObjectInspector();
+ ObjectInspector voi = moi.getMapValueObjectInspector();
+ Map<?, ?> map = moi.getMap(obj);
+ if (map == null) {
+ throw new BadRecordException(
+ "Illegal value NULL for Hive data type Map");
+ } else if (map.isEmpty()) {
+ traverseTuple(null, koi, complexRecord, true);
+ traverseTuple(null, voi, complexRecord, true);
+ addOneFieldToRecord(mapRecord, TEXT,
+ HdfsUtilities.toString(complexRecord, mapkeyDelim));
+ } else {
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ traverseTuple(entry.getKey(), koi, complexRecord, true);
+ traverseTuple(entry.getValue(), voi, complexRecord, true);
+ addOneFieldToRecord(mapRecord, TEXT,
+ HdfsUtilities.toString(complexRecord, mapkeyDelim));
+ complexRecord.clear();
+ }
+ }
+ return mapRecord;
+ }
+
+ private void resolvePrimitive(Object o, PrimitiveObjectInspector oi,
+ List<OneField> record, boolean toFlatten)
+ throws IOException {
+ Object val;
+ switch (oi.getPrimitiveCategory()) {
+ case BOOLEAN: {
+ val = (o != null) ? ((BooleanObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, BOOLEAN, val);
+ break;
+ }
+ case SHORT: {
+ val = (o != null) ? ((ShortObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, SMALLINT, val);
+ break;
+ }
+ case INT: {
+ val = (o != null) ? ((IntObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, INTEGER, val);
+ break;
+ }
+ case LONG: {
+ val = (o != null) ? ((LongObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, BIGINT, val);
+ break;
+ }
+ case FLOAT: {
+ val = (o != null) ? ((FloatObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, REAL, val);
+ break;
+ }
+ case DOUBLE: {
+ val = (o != null) ? ((DoubleObjectInspector) oi).get(o) : null;
+ addOneFieldToRecord(record, FLOAT8, val);
+ break;
+ }
+ case DECIMAL: {
+ String sVal = null;
+ if (o != null) {
+ HiveDecimal hd = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
+ if (hd != null) {
+ BigDecimal bd = hd.bigDecimalValue();
+ sVal = bd.toString();
+ }
+ }
+ addOneFieldToRecord(record, NUMERIC, sVal);
+ break;
+ }
+ case STRING: {
+ val = (o != null) ? ((StringObjectInspector) oi).getPrimitiveJavaObject(o)
+ : null;
+ addOneFieldToRecord(record, TEXT,
+ toFlatten ? String.format("\"%s\"", val) : val);
+ break;
+ }
+ case VARCHAR:
+ val = (o != null) ? ((HiveVarcharObjectInspector) oi).getPrimitiveJavaObject(o)
+ : null;
+ addOneFieldToRecord(record, VARCHAR,
+ toFlatten ? String.format("\"%s\"", val) : val);
+ break;
+ case CHAR:
+ val = (o != null) ? ((HiveCharObjectInspector) oi).getPrimitiveJavaObject(o)
+ : null;
+ addOneFieldToRecord(record, BPCHAR,
+ toFlatten ? String.format("\"%s\"", val) : val);
+ break;
+ case BINARY: {
+ byte[] toEncode = null;
+ if (o != null) {
+ BytesWritable bw = ((BinaryObjectInspector) oi).getPrimitiveWritableObject(o);
+ toEncode = new byte[bw.getLength()];
+ System.arraycopy(bw.getBytes(), 0, toEncode, 0,
+ bw.getLength());
+ }
+ addOneFieldToRecord(record, BYTEA, toEncode);
+ break;
+ }
+ case TIMESTAMP: {
+ val = (o != null) ? ((TimestampObjectInspector) oi).getPrimitiveJavaObject(o)
+ : null;
+ addOneFieldToRecord(record, TIMESTAMP, val);
+ break;
+ }
+ case DATE:
+ val = (o != null) ? ((DateObjectInspector) oi).getPrimitiveJavaObject(o)
+ : null;
+ addOneFieldToRecord(record, DATE, val);
+ break;
+ case BYTE: { /* TINYINT */
+ val = (o != null) ? new Short(((ByteObjectInspector) oi).get(o))
+ : null;
+ addOneFieldToRecord(record, SMALLINT, val);
+ break;
+ }
+ default: {
+ throw new UnsupportedTypeException(oi.getTypeName()
+ + " conversion is not supported by "
+ + getClass().getSimpleName());
+ }
+ }
+ }
+
+ private void addOneFieldToRecord(List<OneField> record,
+ DataType gpdbWritableType, Object val) {
+ record.add(new OneField(gpdbWritableType.getOID(), val));
+ }
+
+ /*
+ * Gets the delimiter character from the URL, verify and store it. Must be a
+ * single ascii character (same restriction as Hawq's). If a hex
+ * representation was passed, convert it to its char.
+ */
+ void parseDelimiterChar(InputData input) {
+
+ String userDelim = input.getUserProperty("DELIMITER");
+
+ if (userDelim == null) {
+ throw new IllegalArgumentException("DELIMITER is a required option");
+ }
+
+ final int VALID_LENGTH = 1;
+ final int VALID_LENGTH_HEX = 4;
+
+ if (userDelim.startsWith("\\x")) { // hexadecimal sequence
+
+ if (userDelim.length() != VALID_LENGTH_HEX) {
+ throw new IllegalArgumentException(
+ "Invalid hexdecimal value for delimiter (got"
+ + userDelim + ")");
+ }
+
+ delimiter = (char) Integer.parseInt(
+ userDelim.substring(2, VALID_LENGTH_HEX), 16);
+
+ if (!CharUtils.isAscii(delimiter)) {
+ throw new IllegalArgumentException(
+ "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
+ + delimiter + ")");
+ }
+
+ return;
+ }
+
+ if (userDelim.length() != VALID_LENGTH) {
+ throw new IllegalArgumentException(
+ "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got "
+ + userDelim + ")");
+ }
+
+ if (!CharUtils.isAscii(userDelim.charAt(0))) {
+ throw new IllegalArgumentException(
+ "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
+ + userDelim + ")");
+ }
+
+ delimiter = userDelim.charAt(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java
index 60bb31e..633e78c 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java
@@ -24,6 +24,7 @@ import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.codec.CharEncoding;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.lang.StringUtils;
import java.io.UnsupportedEncodingException;
import java.util.List;
@@ -56,7 +57,12 @@ public abstract class RestResource {
String key = entry.getKey();
List<String> values = entry.getValue();
if (values != null) {
- String value = values.get(0);
+ String value;
+ if(values.size() > 1) {
+ value = StringUtils.join(values, ",");
+ } else {
+ value = values.get(0);
+ }
if (value != null) {
// converting to value UTF-8 encoding
value = new String(value.getBytes(CharEncoding.ISO_8859_1),
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index 0337937..f492378 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -383,16 +383,40 @@ public class ProtocolData extends InputData {
/*
* Sets the tuple description for the record
+ * Attribute Projection information is optional
*/
void parseTupleDescription() {
+
+ /* Process column projection info */
+ String columnProjStr = getOptionalProperty("ATTRS-PROJ");
+ List<Integer> columnProjList = new ArrayList<Integer>();
+ if(columnProjStr != null) {
+ int columnProj = Integer.parseInt(columnProjStr);
+ if(columnProj > 0) {
+ String columnProjIndexStr = getProperty("ATTRS-PROJ-IDX");
+ String columnProjIdx[] = columnProjIndexStr.split(",");
+ for(int i = 0; i < columnProj; i++) {
+ columnProjList.add(Integer.valueOf(columnProjIdx[i]));
+ }
+ } else {
+ /* This is a special case to handle aggregate queries not related to any specific column
+ * eg: count(*) queries. */
+ columnProjList.add(0);
+ }
+ }
+
int columns = getIntProperty("ATTRS");
for (int i = 0; i < columns; ++i) {
String columnName = getProperty("ATTR-NAME" + i);
int columnTypeCode = getIntProperty("ATTR-TYPECODE" + i);
String columnTypeName = getProperty("ATTR-TYPENAME" + i);
-
- ColumnDescriptor column = new ColumnDescriptor(columnName,
- columnTypeCode, i, columnTypeName);
+ ColumnDescriptor column;
+ if(columnProjStr != null) {
+ column = new ColumnDescriptor(columnName, columnTypeCode, i, columnTypeName, columnProjList.contains(i));
+ } else {
+ /* For data formats that don't support column projection */
+ column = new ColumnDescriptor(columnName, columnTypeCode, i, columnTypeName);
+ }
tupleDescription.add(column);
if (columnName.equalsIgnoreCase(ColumnDescriptor.RECORD_KEY_NAME)) {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
index d72df94..9025cc1 100644
--- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
+++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
@@ -80,6 +80,20 @@ under the License.
</plugins>
</profile>
<profile>
+ <name>HiveORC</name>
+ <description>This profile is suitable only for Hive tables stored in ORC files
+ and serialized with either the ColumnarSerDe or the LazyBinaryColumnarSerDe.
+ It is much faster than the general purpose Hive profile.
+ DELIMITER parameter is mandatory.
+ </description>
+ <plugins>
+ <fragmenter>org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter</fragmenter>
+ <accessor>org.apache.hawq.pxf.plugins.hive.HiveORCAccessor</accessor>
+ <resolver>org.apache.hawq.pxf.plugins.hive.HiveORCSerdeResolver</resolver>
+ <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+ </plugins>
+ </profile>
+ <profile>
<name>HdfsTextSimple</name>
<description>This profile is suitable for using when reading delimited single line records from plain text files
on HDFS