You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by od...@apache.org on 2017/06/22 02:58:14 UTC
incubator-hawq git commit: HAWQ-1446: Introduce vectorized profile
for ORC.
Repository: incubator-hawq
Updated Branches:
refs/heads/master 339806f3a -> 29a160839
HAWQ-1446: Introduce vectorized profile for ORC.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/29a16083
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/29a16083
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/29a16083
Branch: refs/heads/master
Commit: 29a160839949a1a08244962c5255933f714af46c
Parents: 339806f
Author: Oleksandr Diachenko <od...@pivotal.io>
Authored: Wed Jun 21 19:58:08 2017 -0700
Committer: Oleksandr Diachenko <od...@pivotal.io>
Committed: Wed Jun 21 19:58:08 2017 -0700
----------------------------------------------------------------------
.../hawq/pxf/api/ReadVectorizedResolver.java | 39 ++
.../org/apache/hawq/pxf/api/StatsAccessor.java | 2 +-
.../hawq/pxf/api/utilities/Utilities.java | 22 +-
.../hawq/pxf/api/utilities/UtilitiesTest.java | 29 ++
.../pxf/plugins/hive/HiveDataFragmenter.java | 5 +-
.../plugins/hive/HiveORCVectorizedAccessor.java | 106 ++++++
.../plugins/hive/HiveORCVectorizedResolver.java | 367 +++++++++++++++++++
.../plugins/hive/utilities/ProfileFactory.java | 15 +-
.../hawq/pxf/service/BridgeOutputBuilder.java | 13 +
.../org/apache/hawq/pxf/service/ReadBridge.java | 2 +-
.../hawq/pxf/service/ReadVectorizedBridge.java | 102 ++++++
.../hawq/pxf/service/rest/BridgeResource.java | 3 +
.../src/main/resources/pxf-profiles-default.xml | 11 +
13 files changed, 707 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java
new file mode 100644
index 0000000..55f8df5
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java
@@ -0,0 +1,39 @@
+package org.apache.hawq.pxf.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+/**
+ *
+ * Interface that defines deserialization batch of records at once.
+ *
+ */
+public interface ReadVectorizedResolver {
+
+ /**
+ * Returns resolved list of tuples
+ *
+ * @param batch unresolved batch
+ * @return list of tuples
+ */
+ public List<List<OneField>> getFieldsForBatch(OneRow batch);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
index d256e77..ec65bd8 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
@@ -29,7 +29,7 @@ public interface StatsAccessor extends ReadAccessor {
/**
* Method which reads needed statistics for current split
- * @throws Exception if retrieving the stats failed
+ * @throws Exception when unable to retrieve statistics
*/
public void retrieveStats() throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
index ed8ad28..175a6e1 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadVectorizedResolver;
import org.apache.hawq.pxf.api.StatsAccessor;
import java.io.ByteArrayInputStream;
@@ -164,7 +165,7 @@ public class Utilities {
* @param inputData input data which has protocol information
* @return fragment metadata
* @throws IllegalArgumentException if fragment metadata information wasn't found in input data
- * @throws Exception if unable to parse the fragment
+ * @throws Exception when error occurred during metadata parsing
*/
public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception {
byte[] serializedLocation = inputData.getFragmentMetadata();
@@ -197,8 +198,8 @@ public class Utilities {
/**
* Based on accessor information determines whether to use AggBridge
- *
- * @param inputData input data
+ *
+ * @param inputData input protocol data
* @return true if AggBridge is applicable for current context
*/
public static boolean useAggBridge(InputData inputData) {
@@ -234,4 +235,19 @@ public class Utilities {
return false;
}
}
+
+ /**
+ * Determines whether use vectorization
+ * @param inputData input protocol data
+ * @return true if vectorization is applicable in a current context
+ */
+ public static boolean useVectorization(InputData inputData) {
+ boolean isVectorizedResolver = false;
+ try {
+ isVectorizedResolver = ArrayUtils.contains(Class.forName(inputData.getResolver()).getInterfaces(), ReadVectorizedResolver.class);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Unable to load resolver class: " + e.getMessage());
+ }
+ return isVectorizedResolver;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
index 01c09bf..5caca6d 100644
--- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
+++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
@@ -30,10 +30,14 @@ import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
+import java.util.List;
import org.apache.hawq.pxf.api.Metadata;
+import org.apache.hawq.pxf.api.OneField;
import org.apache.hawq.pxf.api.OneRow;
import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.ReadVectorizedResolver;
import org.apache.hawq.pxf.api.StatsAccessor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Utilities;
@@ -89,6 +93,22 @@ public class UtilitiesTest {
}
}
+ class ReadVectorizedResolverImpl implements ReadVectorizedResolver {
+
+ @Override
+ public List<List<OneField>> getFieldsForBatch(OneRow batch) {
+ return null;
+ }
+ }
+
+ class ReadResolverImpl implements ReadResolver {
+
+ @Override
+ public List<OneField> getFields(OneRow row) throws Exception {
+ return null;
+ }
+ }
+
@Test
public void byteArrayToOctalStringNull() throws Exception {
StringBuilder sb = null;
@@ -222,4 +242,13 @@ public class UtilitiesTest {
when(metaData.getNumAttrsProjected()).thenReturn(1);
assertFalse(Utilities.useStats(accessor, metaData));
}
+
+ @Test
+ public void useVectorization() {
+ InputData metaData = mock(InputData.class);
+ when(metaData.getResolver()).thenReturn("org.apache.hawq.pxf.api.utilities.UtilitiesTest$ReadVectorizedResolverImpl");
+ assertTrue(Utilities.useVectorization(metaData));
+ when(metaData.getResolver()).thenReturn("org.apache.hawq.pxf.api.utilities.UtilitiesTest$ReadResolverImpl");
+ assertFalse(Utilities.useVectorization(metaData));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
index 9cf8f27..6e193c2 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
@@ -286,10 +286,11 @@ public class HiveDataFragmenter extends Fragmenter {
InputFormat<?, ?> fformat = makeInputFormat(
tablePartition.storageDesc.getInputFormat(), jobConf);
String profile = null;
- if (inputData.getProfile() != null) {
+ String userProfile = inputData.getProfile();
+ if (userProfile != null) {
// evaluate optimal profile based on file format if profile was explicitly specified in url
// if user passed accessor+fragmenter+resolver - use them
- profile = ProfileFactory.get(fformat, hasComplexTypes);
+ profile = ProfileFactory.get(fformat, hasComplexTypes, userProfile);
}
String fragmenterForProfile = null;
if (profile != null) {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java
new file mode 100644
index 0000000..3de1500
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java
@@ -0,0 +1,106 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.*;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.Reader.Options;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Accessor class which reads data in batches.
+ * One batch is 1024 rows of all projected columns
+ *
+ */
+public class HiveORCVectorizedAccessor extends HiveORCAccessor {
+
+ private RecordReader vrr;
+ private int batchIndex;
+ private VectorizedRowBatch batch;
+
+ public HiveORCVectorizedAccessor(InputData input) throws Exception {
+ super(input);
+ }
+
+ @Override
+ public boolean openForRead() throws Exception {
+ Options options = new Options();
+ addColumns(options);
+ addFragments(options);
+ orcReader = HiveUtilities.getOrcReader(inputData);
+ vrr = orcReader.rowsOptions(options);
+ return vrr.hasNext();
+ }
+
+ /**
+ * File might have multiple splits, so this method restricts
+ * reader to one split.
+ * @param options reader options to modify
+ */
+ private void addFragments(Options options) {
+ FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData);
+ options.range(fileSplit.getStart(), fileSplit.getLength());
+ }
+
+ /**
+ * Reads next batch for current fragment.
+ * @return next batch in OneRow format, key is a batch number, data is a batch
+ */
+ @Override
+ public OneRow readNextObject() throws IOException {
+ if (vrr.hasNext()) {
+ batch = vrr.nextBatch(batch);
+ batchIndex++;
+ return new OneRow(new LongWritable(batchIndex), batch);
+ } else {
+ //All batches are exhausted
+ return null;
+ }
+ }
+
+ /**
+ * This method updated reader options to include projected columns only.
+ * @param options reader options to modify
+ * @throws Exception
+ */
+ private void addColumns(Options options) throws Exception {
+ boolean[] includeColumns = new boolean[inputData.getColumns() + 1];
+ for (ColumnDescriptor col : inputData.getTupleDescription()) {
+ if (col.isProjected()) {
+ includeColumns[col.columnIndex() + 1] = true;
+ }
+ }
+ options.include(includeColumns);
+ }
+
+ @Override
+ public void closeForRead() throws Exception {
+ if (vrr != null) {
+ vrr.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java
new file mode 100644
index 0000000..5d03d7a
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java
@@ -0,0 +1,367 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.apache.hawq.pxf.api.io.DataType.BIGINT;
+import static org.apache.hawq.pxf.api.io.DataType.BOOLEAN;
+import static org.apache.hawq.pxf.api.io.DataType.BPCHAR;
+import static org.apache.hawq.pxf.api.io.DataType.BYTEA;
+import static org.apache.hawq.pxf.api.io.DataType.DATE;
+import static org.apache.hawq.pxf.api.io.DataType.FLOAT8;
+import static org.apache.hawq.pxf.api.io.DataType.INTEGER;
+import static org.apache.hawq.pxf.api.io.DataType.NUMERIC;
+import static org.apache.hawq.pxf.api.io.DataType.REAL;
+import static org.apache.hawq.pxf.api.io.DataType.SMALLINT;
+import static org.apache.hawq.pxf.api.io.DataType.TEXT;
+import static org.apache.hawq.pxf.api.io.DataType.TIMESTAMP;
+import static org.apache.hawq.pxf.api.io.DataType.VARCHAR;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.sql.Timestamp;
+import java.sql.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadVectorizedResolver;
+import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hadoop.hive.serde2.*;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.ql.exec.vector.*;
+
+/**
+ * Class which implements resolving a batch of records at once
+ */
+public class HiveORCVectorizedResolver extends HiveResolver implements ReadVectorizedResolver {
+
+ private static final Log LOG = LogFactory.getLog(HiveORCVectorizedResolver.class);
+
+ private List<List<OneField>> resolvedBatch;
+ private StructObjectInspector soi;
+
+ public HiveORCVectorizedResolver(InputData input) throws Exception {
+ super(input);
+ try {
+ soi = (StructObjectInspector) HiveUtilities.getOrcReader(input).getObjectInspector();
+ } catch (Exception e) {
+ LOG.error("Unable to create an object inspector.");
+ throw e;
+ }
+ }
+
+ @Override
+ public List<List<OneField>> getFieldsForBatch(OneRow batch) {
+
+ Writable writableObject = null;
+ Object fieldValue = null;
+ VectorizedRowBatch vectorizedBatch = (VectorizedRowBatch) batch.getData();
+
+ /* Allocate empty result set */
+ int columnsNumber = inputData.getColumns();
+ resolvedBatch = new ArrayList<List<OneField>>(vectorizedBatch.size);
+
+ /* Create empty template row */
+ ArrayList<OneField> templateRow = new ArrayList<OneField>(columnsNumber);
+ ArrayList<OneField> currentRow = null;
+ for (int j = 0; j < inputData.getColumns(); j++) {
+ templateRow.add(null);
+ }
+ /* Replicate template row*/
+ for (int i = 0; i < vectorizedBatch.size; i++) {
+ currentRow = new ArrayList<OneField>(templateRow);
+ resolvedBatch.add(currentRow);
+ }
+
+ /* process all columns*/
+ List<? extends StructField> allStructFieldRefs = soi.getAllStructFieldRefs();
+ for (int columnIndex = 0; columnIndex < vectorizedBatch.numCols; columnIndex++) {
+ ObjectInspector oi = allStructFieldRefs.get(columnIndex).getFieldObjectInspector();
+ if (oi.getCategory() == Category.PRIMITIVE) {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+ resolvePrimitiveColumn(columnIndex, oi, vectorizedBatch);
+ } else {
+ throw new UnsupportedTypeException("Unable to resolve column index:" + columnIndex
+ + ". Only primitive types are supported.");
+ }
+ }
+
+ return resolvedBatch;
+ }
+
+ /**
+ * Resolves a column of a primitive type out of given batch
+ *
+ * @param columnIndex index of the column
+ * @param oi object inspector
+ * @param vectorizedBatch input batch or records
+ */
+ private void resolvePrimitiveColumn(int columnIndex, ObjectInspector oi, VectorizedRowBatch vectorizedBatch) {
+
+ OneField field = null;
+ Writable writableObject = null;
+ PrimitiveCategory poc = ((PrimitiveObjectInspector) oi).getPrimitiveCategory();
+ populatePrimitiveColumn(poc, oi, vectorizedBatch, columnIndex);
+ }
+
+ private void addValueToColumn(int columnIndex, int rowIndex, OneField field) {
+ List<OneField> row = this.resolvedBatch.get(rowIndex);
+ row.set(columnIndex, field);
+ }
+
+ private void populatePrimitiveColumn(PrimitiveCategory primitiveCategory, ObjectInspector oi, VectorizedRowBatch vectorizedBatch, int columnIndex) {
+ ColumnVector columnVector = vectorizedBatch.cols[columnIndex];
+ Object fieldValue = null;
+ DataType fieldType = null;
+
+ switch (primitiveCategory) {
+ case BOOLEAN: {
+ fieldType = BOOLEAN;
+ LongColumnVector lcv = (LongColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (lcv != null) {
+ int rowId = lcv.isRepeating ? 0 : rowIndex;
+ if (!lcv.isNull[rowId]) {
+ fieldValue = lcv.vector[rowId] == 1;
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case SHORT: {
+ fieldType = SMALLINT;
+ LongColumnVector lcv = (LongColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (lcv != null) {
+ int rowId = lcv.isRepeating ? 0 : rowIndex;
+ if (!lcv.isNull[rowId]) {
+ fieldValue = (short) lcv.vector[rowId];
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case INT: {
+ fieldType = INTEGER;
+ LongColumnVector lcv = (LongColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (lcv != null) {
+ int rowId = lcv.isRepeating ? 0 : rowIndex;
+ if (!lcv.isNull[rowId]) {
+ fieldValue = (int) lcv.vector[rowId];
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case LONG: {
+ fieldType = BIGINT;
+ LongColumnVector lcv = (LongColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (lcv != null) {
+ int rowId = lcv.isRepeating ? 0 : rowIndex;
+ if (!lcv.isNull[rowId]) {
+ fieldValue = lcv.vector[rowId];
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case FLOAT: {
+ fieldType = REAL;
+ DoubleColumnVector dcv = (DoubleColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (dcv != null) {
+ int rowId = dcv.isRepeating ? 0 : rowIndex;
+ if (!dcv.isNull[rowId]) {
+ fieldValue = (float) dcv.vector[rowId];
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case DOUBLE: {
+ fieldType = FLOAT8;
+ DoubleColumnVector dcv = (DoubleColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (dcv != null) {
+ int rowId = dcv.isRepeating ? 0 : rowIndex;
+ if (!dcv.isNull[rowId]) {
+ fieldValue = dcv.vector[rowId];
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case DECIMAL: {
+ fieldType = NUMERIC;
+ DecimalColumnVector dcv = (DecimalColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (dcv != null) {
+ int rowId = dcv.isRepeating ? 0 : rowIndex;
+ if (!dcv.isNull[rowId]) {
+ fieldValue = dcv.vector[rowId];
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case VARCHAR: {
+ fieldType = VARCHAR;
+ BytesColumnVector bcv = (BytesColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (columnVector != null) {
+ int rowId = bcv.isRepeating ? 0 : rowIndex;
+ if (!bcv.isNull[rowId]) {
+ Text textValue = new Text();
+ textValue.set(bcv.vector[rowIndex], bcv.start[rowIndex], bcv.length[rowIndex]);
+ fieldValue = textValue;
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case CHAR: {
+ fieldType = BPCHAR;
+ BytesColumnVector bcv = (BytesColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (columnVector != null) {
+ int rowId = bcv.isRepeating ? 0 : rowIndex;
+ if (!bcv.isNull[rowId]) {
+ Text textValue = new Text();
+ textValue.set(bcv.vector[rowIndex], bcv.start[rowIndex], bcv.length[rowIndex]);
+ fieldValue = textValue;
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case STRING: {
+ fieldType = TEXT;
+ BytesColumnVector bcv = (BytesColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (columnVector != null) {
+ int rowId = bcv.isRepeating ? 0 : rowIndex;
+ if (!bcv.isNull[rowId]) {
+ Text textValue = new Text();
+ textValue.set(bcv.vector[rowIndex], bcv.start[rowIndex], bcv.length[rowIndex]);
+ fieldValue = textValue;
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case BINARY: {
+ fieldType = BYTEA;
+ BytesColumnVector bcv = (BytesColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (columnVector != null) {
+ int rowId = bcv.isRepeating ? 0 : rowIndex;
+ if (!bcv.isNull[rowId]) {
+ fieldValue = new byte[bcv.length[rowId]];
+ System.arraycopy(bcv.vector[rowId], bcv.start[rowId], fieldValue, 0, bcv.length[rowId]);
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case DATE: {
+ fieldType = DATE;
+ LongColumnVector lcv = (LongColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (lcv != null) {
+ int rowId = lcv.isRepeating ? 0 : rowIndex;
+ if (!lcv.isNull[rowId]) {
+ fieldValue = new Date(DateWritable.daysToMillis((int) lcv.vector[rowIndex]));
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ case BYTE: {
+ fieldType = SMALLINT;
+ LongColumnVector lcv = (LongColumnVector) columnVector;
+ for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+ fieldValue = null;
+ if (lcv != null) {
+ int rowId = lcv.isRepeating ? 0 : rowIndex;
+ if (!lcv.isNull[rowId]) {
+ fieldValue = (short) lcv.vector[rowIndex];
+ }
+ }
+ addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue));
+ }
+ break;
+ }
+ default: {
+ throw new UnsupportedTypeException(oi.getTypeName()
+ + " conversion is not supported by "
+ + getClass().getSimpleName());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
index f36f074..7294a02 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hawq.pxf.api.Metadata;
/**
* Factory class which returns optimal profile for given input format
@@ -35,16 +34,20 @@ public class ProfileFactory {
private static final String HIVE_RC_PROFILE = "HiveRC";
private static final String HIVE_ORC_PROFILE = "HiveORC";
private static final String HIVE_PROFILE = "Hive";
+ private static final String HIVE_ORC_VECTORIZED_PROFILE = "HiveVectorizedORC";
/**
* The method which returns optimal profile
*
* @param inputFormat input format of table/partition
* @param hasComplexTypes whether record has complex types, see @EnumHiveToHawqType
+ * @param userProfileName profile name provided by user
* @return name of optimal profile
*/
- public static String get(InputFormat inputFormat, boolean hasComplexTypes) {
+ public static String get(InputFormat inputFormat, boolean hasComplexTypes, String userProfileName) {
String profileName = null;
+ if (HIVE_ORC_VECTORIZED_PROFILE.equals(userProfileName))
+ return userProfileName;
if (inputFormat instanceof TextInputFormat && !hasComplexTypes) {
profileName = HIVE_TEXT_PROFILE;
} else if (inputFormat instanceof RCFileInputFormat) {
@@ -58,4 +61,12 @@ public class ProfileFactory {
return profileName;
}
+ /**
+ * @see ProfileFactory#get(InputFormat, boolean, String)
+ */
+ public static String get(InputFormat inputFormat, boolean hasComplexTypes) {
+ String profileName = get(inputFormat, hasComplexTypes, null);
+ return profileName;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
index 1c199d3..f9dbd72 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
@@ -137,6 +137,19 @@ public class BridgeOutputBuilder {
return outputList;
}
+ public LinkedList<Writable> makeVectorizedOutput(List<List<OneField>> recordsBatch) throws BadRecordException {
+ outputList.clear();
+ if (recordsBatch != null) {
+ for (List<OneField> record : recordsBatch) {
+ if (inputData.outputFormat() == OutputFormat.GPDBWritable) {
+ makeGPDBWritableOutput();
+ }
+ fillOutputRecord(record);
+ }
+ }
+ return outputList;
+ }
+
/**
* Returns whether or not this is a partial line.
*
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
index edd0a99..dd095dd 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
@@ -166,7 +166,7 @@ public class ReadBridge implements Bridge {
* analyzing the exception type, and when we discover that the actual
* problem was a data problem, we return the errorOutput GPDBWritable.
*/
- private boolean isDataException(IOException ex) {
+ protected boolean isDataException(IOException ex) {
return (ex instanceof EOFException
|| ex instanceof CharacterCodingException
|| ex instanceof CharConversionException
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java
new file mode 100644
index 0000000..ca222f1
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java
@@ -0,0 +1,102 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadVectorizedResolver;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+
+public class ReadVectorizedBridge extends ReadBridge {
+
+ private static final Log LOG = LogFactory.getLog(ReadVectorizedBridge.class);
+
+ public ReadVectorizedBridge(ProtocolData protData) throws Exception {
+ super(protData);
+ }
+
+ @Override
+ public Writable getNext() throws Exception {
+ Writable output = null;
+ OneRow batch = null;
+
+ if (!outputQueue.isEmpty()) {
+ return outputQueue.pop();
+ }
+
+ try {
+ while (outputQueue.isEmpty()) {
+ batch = fileAccessor.readNextObject();
+ if (batch == null) {
+ output = outputBuilder.getPartialLine();
+ if (output != null) {
+ LOG.warn("A partial record in the end of the fragment");
+ }
+ // if there is a partial line, return it now, otherwise it
+ // will return null
+ return output;
+ }
+
+ // we checked before that outputQueue is empty, so we can
+ // override it.
+ List<List<OneField>> resolvedBatch = ((ReadVectorizedResolver) fieldsResolver).getFieldsForBatch(batch);
+ outputQueue = outputBuilder.makeVectorizedOutput(resolvedBatch);
+ if (!outputQueue.isEmpty()) {
+ output = outputQueue.pop();
+ break;
+ }
+ }
+ } catch (IOException ex) {
+ if (!isDataException(ex)) {
+ throw ex;
+ }
+ output = outputBuilder.getErrorOutput(ex);
+ } catch (BadRecordException ex) {
+ String row_info = "null";
+ if (batch != null) {
+ row_info = batch.toString();
+ }
+ if (ex.getCause() != null) {
+ LOG.debug("BadRecordException " + ex.getCause().toString()
+ + ": " + row_info);
+ } else {
+ LOG.debug(ex.toString() + ": " + row_info);
+ }
+ output = outputBuilder.getErrorOutput(ex);
+ } catch (Exception ex) {
+ throw ex;
+ }
+
+ return output;
+ }
+
+ @Override
+ public void endIteration() throws Exception {
+ fileAccessor.closeForRead();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
index 4294e09..027663b 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
@@ -44,6 +44,7 @@ import org.apache.hawq.pxf.service.AggBridge;
import org.apache.hawq.pxf.service.Bridge;
import org.apache.hawq.pxf.service.ReadBridge;
import org.apache.hawq.pxf.service.ReadSamplingBridge;
+import org.apache.hawq.pxf.service.ReadVectorizedBridge;
import org.apache.hawq.pxf.service.io.Writable;
import org.apache.hawq.pxf.service.utilities.ProtocolData;
import org.apache.hawq.pxf.service.utilities.SecuredHDFS;
@@ -101,6 +102,8 @@ public class BridgeResource extends RestResource {
bridge = new ReadSamplingBridge(protData);
} else if (Utilities.useAggBridge(protData)) {
bridge = new AggBridge(protData);
+ } else if (Utilities.useVectorization(protData)) {
+ bridge = new ReadVectorizedBridge(protData);
} else {
bridge = new ReadBridge(protData);
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
index f076ead..a8666eb 100644
--- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
+++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
@@ -101,6 +101,17 @@ under the License.
<outputFormat>org.apache.hawq.pxf.service.io.GPDBWritable</outputFormat>
</plugins>
</profile>
+ <profile>
+ <name>HiveVectorizedORC</name>
+ <description></description>
+ <plugins>
+ <fragmenter>org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter</fragmenter>
+ <accessor>org.apache.hawq.pxf.plugins.hive.HiveORCVectorizedAccessor</accessor>
+ <resolver>org.apache.hawq.pxf.plugins.hive.HiveORCVectorizedResolver</resolver>
+ <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+ <outputFormat>org.apache.hawq.pxf.service.io.GPDBWritable</outputFormat>
+ </plugins>
+ </profile>
<profile>
<name>HdfsTextSimple</name>
<description>This profile is suitable for using when reading delimited single line records from plain text files