You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/05/03 01:59:48 UTC
[37/40] hive git commit: HIVE-12878: Support Vectorization for
TEXTFILE and other formats (Matt McCline, reviewed by Sergey Shelukhin)
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java
deleted file mode 100644
index faec0aa..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-/**
- * This class extracts specified VectorizedRowBatch row columns into a Writable row Object[].
- *
- * The caller provides the hive type names and target column numbers in the order desired to
- * extract from the Writable row Object[].
- *
- * This class is for use when the batch being assigned is always the same.
- */
-public class VectorExtractRowSameBatch extends VectorExtractRow {
-
- public void setOneBatch(VectorizedRowBatch batch) throws HiveException {
- setBatch(batch);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
index a3082c3..ff88b85 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
@@ -43,7 +43,7 @@ public class VectorFileSinkOperator extends FileSinkOperator {
private transient boolean firstBatch;
- private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+ private transient VectorExtractRow vectorExtractRow;
protected transient Object[] singleRow;
@@ -80,30 +80,26 @@ public class VectorFileSinkOperator extends FileSinkOperator {
public void process(Object data, int tag) throws HiveException {
VectorizedRowBatch batch = (VectorizedRowBatch) data;
if (firstBatch) {
- vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
- vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
+ vectorExtractRow = new VectorExtractRow();
+ vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
- singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+ singleRow = new Object[vectorExtractRow.getCount()];
firstBatch = false;
}
- vectorExtractRowDynBatch.setBatchOnEntry(batch);
-
if (batch.selectedInUse) {
int selected[] = batch.selected;
for (int logical = 0 ; logical < batch.size; logical++) {
int batchIndex = selected[logical];
- vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ vectorExtractRow.extractRow(batch, batchIndex, singleRow);
super.process(singleRow, tag);
}
} else {
for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
- vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ vectorExtractRow.extractRow(batch, batchIndex, singleRow);
super.process(singleRow, tag);
}
}
-
- vectorExtractRowDynBatch.forgetBatchOnExit();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index f20f614..98a9bf6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -103,7 +103,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
private transient VectorizedRowBatch outputBatch;
private transient VectorizedRowBatchCtx vrbCtx;
- private transient VectorAssignRowSameBatch vectorAssignRowSameBatch;
+ private transient VectorAssignRow vectorAssignRow;
private transient int numEntriesHashTable;
@@ -823,9 +823,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
vrbCtx = new VectorizedRowBatchCtx();
vrbCtx.init((StructObjectInspector) outputObjInspector, vOutContext.getScratchColumnTypeNames());
outputBatch = vrbCtx.createVectorizedRowBatch();
- vectorAssignRowSameBatch = new VectorAssignRowSameBatch();
- vectorAssignRowSameBatch.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns());
- vectorAssignRowSameBatch.setOneBatch(outputBatch);
+ vectorAssignRow = new VectorAssignRow();
+ vectorAssignRow.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns());
}
} catch (HiveException he) {
@@ -912,11 +911,11 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
} else {
// Output keys and aggregates into the output batch.
for (int i = 0; i < outputKeyLength; ++i) {
- vectorAssignRowSameBatch.assignRowColumn(outputBatch.size, fi++,
+ vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++,
keyWrappersBatch.getWritableKeyValue (kw, i, keyOutputWriters[i]));
}
for (int i = 0; i < aggregators.length; ++i) {
- vectorAssignRowSameBatch.assignRowColumn(outputBatch.size, fi++,
+ vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++,
aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)));
}
++outputBatch.size;
@@ -937,7 +936,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
throws HiveException {
int fi = outputKeyLength; // Start after group keys.
for (int i = 0; i < aggregators.length; ++i) {
- vectorAssignRowSameBatch.assignRowColumn(outputBatch.size, fi++,
+ vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++,
aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)));
}
++outputBatch.size;
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
index 6bed52f..902a183 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
@@ -59,7 +59,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
protected transient VectorizedRowBatch outputBatch;
protected transient VectorizedRowBatch scratchBatch; // holds restored (from disk) big table rows
- protected transient Map<ObjectInspector, VectorAssignRowSameBatch> outputVectorAssignRowMap;
+ protected transient Map<ObjectInspector, VectorAssignRow> outputVectorAssignRowMap;
protected transient VectorizedRowBatchCtx vrbCtx = null;
@@ -100,7 +100,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
outputBatch = vrbCtx.createVectorizedRowBatch();
- outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRowSameBatch>();
+ outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRow>();
}
/**
@@ -109,15 +109,14 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
@Override
protected void internalForward(Object row, ObjectInspector outputOI) throws HiveException {
Object[] values = (Object[]) row;
- VectorAssignRowSameBatch va = outputVectorAssignRowMap.get(outputOI);
+ VectorAssignRow va = outputVectorAssignRowMap.get(outputOI);
if (va == null) {
- va = new VectorAssignRowSameBatch();
+ va = new VectorAssignRow();
va.init((StructObjectInspector) outputOI, vOutContext.getProjectedColumns());
- va.setOneBatch(outputBatch);
outputVectorAssignRowMap.put(outputOI, va);
}
- va.assignRow(outputBatch.size, values);
+ va.assignRow(outputBatch, outputBatch.size, values);
++outputBatch.size;
if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index e8f4471..3323df3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -181,7 +181,9 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
joinValues[posBigTable] = vectorNodeEvaluators;
// Filtering is handled in the input batch processing
- filterMaps[posBigTable] = null;
+ if (filterMaps != null) {
+ filterMaps[posBigTable] = null;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
index 0fe1188..22bebb0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
@@ -44,7 +44,7 @@ public class VectorMapJoinOuterFilteredOperator extends VectorMapJoinBaseOperato
private transient boolean firstBatch;
- private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+ private transient VectorExtractRow vectorExtractRow;
protected transient Object[] singleRow;
@@ -94,33 +94,28 @@ public class VectorMapJoinOuterFilteredOperator extends VectorMapJoinBaseOperato
}
if (firstBatch) {
- vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
- vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
+ vectorExtractRow = new VectorExtractRow();
+ vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
- singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+ singleRow = new Object[vectorExtractRow.getCount()];
firstBatch = false;
}
-
- vectorExtractRowDynBatch.setBatchOnEntry(batch);
-
// VectorizedBatchUtil.debugDisplayBatch( batch, "VectorReduceSinkOperator processOp ");
if (batch.selectedInUse) {
int selected[] = batch.selected;
for (int logical = 0 ; logical < batch.size; logical++) {
int batchIndex = selected[logical];
- vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ vectorExtractRow.extractRow(batch, batchIndex, singleRow);
super.process(singleRow, tag);
}
} else {
for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
- vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ vectorExtractRow.extractRow(batch, batchIndex, singleRow);
super.process(singleRow, tag);
}
}
-
- vectorExtractRowDynBatch.forgetBatchOnExit();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
index 9f0c24e..6979956 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
@@ -18,20 +18,422 @@
package org.apache.hadoop.hive.ql.exec.vector;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.AbstractMapOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.fast.DeserializeRead;
+import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.Writable;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
-public class VectorMapOperator extends MapOperator {
+/*
+ *
+ * The vectorized MapOperator.
+ *
+ * There are 3 modes of reading for vectorization:
+ *
+ * 1) One for the Vectorized Input File Format which returns VectorizedRowBatch as the row.
+ *
+ * 2) One for using VectorDeserializeRow to deserialize each row into the VectorizedRowBatch.
+ * Currently, these Input File Formats:
+ * TEXTFILE
+ * SEQUENCEFILE
+ *
+ * 3) And one using the regular partition deserializer to get the row object and assigning
+ * the row object into the VectorizedRowBatch with VectorAssignRow.
+ * This picks up Input File Format not supported by the other two.
+ */
+public class VectorMapOperator extends AbstractMapOperator {
private static final long serialVersionUID = 1L;
+ /*
+ * Overall information on this vectorized Map operation.
+ */
+ private transient HashMap<String, VectorPartitionContext> fileToPartitionContextMap;
+
+ private transient Operator<? extends OperatorDesc> oneRootOperator;
+
+ private transient TypeInfo tableStructTypeInfo;
+ private transient StandardStructObjectInspector tableStandardStructObjectInspector;
+
+ private transient TypeInfo[] tableRowTypeInfos;
+
+ private transient VectorizedRowBatchCtx batchContext;
+ // The context for creating the VectorizedRowBatch for this Map node that
+ // the Vectorizer class determined.
+
+ /*
+ * A different batch for vectorized Input File Format readers so they can do their work
+ * overlapped with work of the row collection that vector/row deserialization does. This allows
+ * the partitions to mix modes (e.g. for us to flush the previously batched rows on file change).
+ */
+ private transient VectorizedRowBatch vectorizedInputFileFormatBatch;
+
+ /*
+ * This batch is only used by vector/row deserializer readers.
+ */
+ private transient VectorizedRowBatch deserializerBatch;
+
+ private transient long batchCounter;
+
+ private transient int dataColumnCount;
+ private transient int partitionColumnCount;
+ private transient Object[] partitionValues;
+
+ private transient boolean[] columnsToIncludeTruncated;
+
+ /*
+ * The following members have context information for the current partition file being read.
+ */
+ private transient VectorMapOperatorReadType currentReadType;
+ private transient VectorPartitionContext currentVectorPartContext;
+ // Current vector map operator read type and context.
+
+ private transient int currentDataColumnCount;
+ // The number of data columns that the current reader will return.
+ // Only applicable for vector/row deserialization.
+
+ private transient DeserializeRead currentDeserializeRead;
+ private transient VectorDeserializeRow currentVectorDeserializeRow;
+ // When we are doing vector deserialization, these are the fast deserializer and
+ // the vector row deserializer.
+
+ private Deserializer currentPartDeserializer;
+ private StructObjectInspector currentPartRawRowObjectInspector;
+ private VectorAssignRow currentVectorAssign;
+ // When we are doing row deserialization, these are the regular deserializer,
+ // partition object inspector, and vector row assigner.
+
+ /*
+ * The abstract context for the 3 kinds of vectorized reading.
+ */
+ protected abstract class VectorPartitionContext {
+
+ protected final PartitionDesc partDesc;
+
+ String tableName;
+ String partName;
+
+ /*
+ * Initialization here is adapted from MapOperator.MapOpCtx.initObjectInspector method.
+ */
+ private VectorPartitionContext(PartitionDesc partDesc) {
+ this.partDesc = partDesc;
+
+ TableDesc td = partDesc.getTableDesc();
+
+ // Use table properties in case of unpartitioned tables,
+ // and the union of table properties and partition properties, with partition
+ // taking precedence, in the case of partitioned tables
+ Properties overlayedProps =
+ SerDeUtils.createOverlayedProperties(td.getProperties(), partDesc.getProperties());
+
+ Map<String, String> partSpec = partDesc.getPartSpec();
+
+ tableName = String.valueOf(overlayedProps.getProperty("name"));
+ partName = String.valueOf(partSpec);
+
+ }
+
+ public PartitionDesc getPartDesc() {
+ return partDesc;
+ }
+
+ /*
+ * Override this for concrete initialization.
+ */
+ public abstract void init(Configuration hconf)
+ throws SerDeException, Exception;
+
+ /*
+ * How many data columns is the partition reader actually supplying?
+ */
+ public abstract int getReaderDataColumnCount();
+ }
+
+ /*
+ * Context for reading a Vectorized Input File Format.
+ */
+ protected class VectorizedInputFileFormatPartitionContext extends VectorPartitionContext {
+
+ private VectorizedInputFileFormatPartitionContext(PartitionDesc partDesc) {
+ super(partDesc);
+ }
+
+ public void init(Configuration hconf) {
+ }
+
+ @Override
+ public int getReaderDataColumnCount() {
+ throw new RuntimeException("Not applicable");
+ }
+ }
+
+ /*
+ * Context for using VectorDeserializeRow to deserialize each row from the Input File Format
+ * into the VectorizedRowBatch.
+ */
+ protected class VectorDeserializePartitionContext extends VectorPartitionContext {
+
+ // This helper object deserializes known deserialization / input file format combination into
+ // columns of a row in a vectorized row batch.
+ private VectorDeserializeRow vectorDeserializeRow;
+
+ private DeserializeRead deserializeRead;
+
+ private int readerColumnCount;
+
+ private VectorDeserializePartitionContext(PartitionDesc partDesc) {
+ super(partDesc);
+ }
+
+ public VectorDeserializeRow getVectorDeserializeRow() {
+ return vectorDeserializeRow;
+ }
+
+ DeserializeRead getDeserializeRead() {
+ return deserializeRead;
+ }
+
+ @Override
+ public int getReaderDataColumnCount() {
+ return readerColumnCount;
+ }
+
+ public void init(Configuration hconf)
+ throws SerDeException, HiveException {
+ VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+
+ // This type information specifies the data types the partition needs to read.
+ TypeInfo[] dataTypeInfos = vectorPartDesc.getDataTypeInfos();
+
+ switch (vectorPartDesc.getVectorDeserializeType()) {
+ case LAZY_SIMPLE:
+ {
+ LazySerDeParameters simpleSerdeParams =
+ new LazySerDeParameters(hconf, partDesc.getTableDesc().getProperties(),
+ LazySimpleSerDe.class.getName());
+
+ LazySimpleDeserializeRead lazySimpleDeserializeRead =
+ new LazySimpleDeserializeRead(dataTypeInfos, simpleSerdeParams);
+
+ vectorDeserializeRow =
+ new VectorDeserializeRow<LazySimpleDeserializeRead>(lazySimpleDeserializeRead);
+
+ // Initialize with data row type conversion parameters.
+ readerColumnCount =
+ vectorDeserializeRow.initConversion(tableRowTypeInfos, columnsToIncludeTruncated);
+
+ deserializeRead = lazySimpleDeserializeRead;
+ }
+ break;
+
+ case LAZY_BINARY:
+ {
+ LazyBinaryDeserializeRead lazyBinaryDeserializeRead =
+ new LazyBinaryDeserializeRead(dataTypeInfos);
+
+ vectorDeserializeRow =
+ new VectorDeserializeRow<LazyBinaryDeserializeRead>(lazyBinaryDeserializeRead);
+
+ // Initialize with data row type conversion parameters.
+ readerColumnCount =
+ vectorDeserializeRow.initConversion(tableRowTypeInfos, columnsToIncludeTruncated);
+
+ deserializeRead = lazyBinaryDeserializeRead;
+ }
+ break;
+
+ default:
+ throw new RuntimeException(
+ "Unexpected vector deserialize row type " + vectorPartDesc.getVectorDeserializeType().name());
+ }
+ }
+ }
+
+ /*
+ * Context for reading using the regular partition deserializer to get the row object and
+ * assigning the row object into the VectorizedRowBatch with VectorAssignRow
+ */
+ protected class RowDeserializePartitionContext extends VectorPartitionContext {
+
+ private Deserializer partDeserializer;
+ private StructObjectInspector partRawRowObjectInspector;
+ private VectorAssignRow vectorAssign;
+
+ private int readerColumnCount;
+
+ private RowDeserializePartitionContext(PartitionDesc partDesc) {
+ super(partDesc);
+ }
+
+ public Deserializer getPartDeserializer() {
+ return partDeserializer;
+ }
+
+ public StructObjectInspector getPartRawRowObjectInspector() {
+ return partRawRowObjectInspector;
+ }
+
+ public VectorAssignRow getVectorAssign() {
+ return vectorAssign;
+ }
+
+ @Override
+ public int getReaderDataColumnCount() {
+ return readerColumnCount;
+ }
+
+ public void init(Configuration hconf)
+ throws Exception {
+ VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+
+ partDeserializer = partDesc.getDeserializer(hconf);
+
+ if (partDeserializer instanceof OrcSerde) {
+
+ // UNDONE: We need to get the table schema inspector from self-describing Input File
+ // Formats like ORC. Modify the ORC serde instead? For now, this works.
+
+ partRawRowObjectInspector =
+ (StructObjectInspector) OrcStruct.createObjectInspector(tableStructTypeInfo);
+
+ } else {
+ partRawRowObjectInspector =
+ (StructObjectInspector) partDeserializer.getObjectInspector();
+ }
+
+ TypeInfo[] dataTypeInfos = vectorPartDesc.getDataTypeInfos();
+
+ vectorAssign = new VectorAssignRow();
+
+ // Initialize with data type conversion parameters.
+ readerColumnCount =
+ vectorAssign.initConversion(dataTypeInfos, tableRowTypeInfos, columnsToIncludeTruncated);
+ }
+ }
+
+ public VectorPartitionContext createAndInitPartitionContext(PartitionDesc partDesc,
+ Configuration hconf)
+ throws SerDeException, Exception {
+
+ VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+ VectorPartitionContext vectorPartitionContext;
+ VectorMapOperatorReadType vectorMapOperatorReadType =
+ vectorPartDesc.getVectorMapOperatorReadType();
+
+ if (vectorMapOperatorReadType == VectorMapOperatorReadType.VECTOR_DESERIALIZE ||
+ vectorMapOperatorReadType == VectorMapOperatorReadType.ROW_DESERIALIZE) {
+ // Verify hive.exec.schema.evolution is true or we have an ACID table so we are producing
+ // the table schema from ORC. The Vectorizer class assures this.
+ boolean isAcid =
+ AcidUtils.isTablePropertyTransactional(partDesc.getTableDesc().getProperties());
+ Preconditions.checkState(Utilities.isSchemaEvolutionEnabled(hconf, isAcid));
+ }
+
+ switch (vectorMapOperatorReadType) {
+ case VECTORIZED_INPUT_FILE_FORMAT:
+ vectorPartitionContext = new VectorizedInputFileFormatPartitionContext(partDesc);
+ break;
+
+ case VECTOR_DESERIALIZE:
+ vectorPartitionContext = new VectorDeserializePartitionContext(partDesc);
+ break;
+
+ case ROW_DESERIALIZE:
+ vectorPartitionContext = new RowDeserializePartitionContext(partDesc);
+ break;
+
+ default:
+ throw new RuntimeException("Unexpected vector MapOperator read type " +
+ vectorMapOperatorReadType.name());
+ }
+
+ vectorPartitionContext.init(hconf);
+
+ return vectorPartitionContext;
+ }
+
+ private void determineColumnsToInclude(Configuration hconf) {
+
+ columnsToIncludeTruncated = null;
+
+ List<Integer> columnsToIncludeTruncatedList = ColumnProjectionUtils.getReadColumnIDs(hconf);
+ if (columnsToIncludeTruncatedList != null &&
+ columnsToIncludeTruncatedList.size() > 0 && columnsToIncludeTruncatedList.size() < dataColumnCount ) {
+
+ // Partitioned columns will not be in the include list.
+
+ boolean[] columnsToInclude = new boolean[dataColumnCount];
+ Arrays.fill(columnsToInclude, false);
+ for (int columnNum : columnsToIncludeTruncatedList) {
+ columnsToInclude[columnNum] = true;
+ }
+
+ // Work backwards to find the highest wanted column.
+
+ int highestWantedColumnNum = -1;
+ for (int i = dataColumnCount - 1; i >= 0; i--) {
+ if (columnsToInclude[i]) {
+ highestWantedColumnNum = i;
+ break;
+ }
+ }
+ if (highestWantedColumnNum == -1) {
+ throw new RuntimeException("No columns to include?");
+ }
+ int newColumnCount = highestWantedColumnNum + 1;
+ if (newColumnCount == dataColumnCount) {
+ columnsToIncludeTruncated = columnsToInclude;
+ } else {
+ columnsToIncludeTruncated = Arrays.copyOf(columnsToInclude, newColumnCount);
+ }
+ }
+ }
+
/** Kryo ctor. */
- @VisibleForTesting
public VectorMapOperator() {
super();
}
@@ -40,29 +442,445 @@ public class VectorMapOperator extends MapOperator {
super(ctx);
}
+
+ /*
+ * This is the same as the setChildren method below but for empty tables.
+ */
+ @Override
+ public void initEmptyInputChildren(List<Operator<?>> children, Configuration hconf)
+ throws SerDeException, Exception {
+
+ // Get the single TableScanOperator. Vectorization only supports one input tree.
+ Preconditions.checkState(children.size() == 1);
+ oneRootOperator = children.get(0);
+
+ internalSetChildren(hconf);
+ }
+
+ @Override
+ public void setChildren(Configuration hconf) throws Exception {
+
+ // Get the single TableScanOperator. Vectorization only supports one input tree.
+ Iterator<Operator<? extends OperatorDesc>> aliasToWorkIterator =
+ conf.getAliasToWork().values().iterator();
+ oneRootOperator = aliasToWorkIterator.next();
+ Preconditions.checkState(!aliasToWorkIterator.hasNext());
+
+ internalSetChildren(hconf);
+ }
+
+ /*
+ * Create information for vector map operator.
+ * The member oneRootOperator has been set.
+ */
+ private void internalSetChildren(Configuration hconf) throws Exception {
+
+ // The setupPartitionContextVars uses the prior read type to flush the prior deserializerBatch,
+ // so set it here to none.
+ currentReadType = VectorMapOperatorReadType.NONE;
+
+ determineColumnsToInclude(hconf);
+
+ batchContext = conf.getVectorizedRowBatchCtx();
+
+ /*
+ * Use a different batch for vectorized Input File Format readers so they can do their work
+ * overlapped with work of the row collection that vector/row deserialization does. This allows
+ * the partitions to mix modes (e.g. for us to flush the previously batched rows on file change).
+ */
+ vectorizedInputFileFormatBatch =
+ batchContext.createVectorizedRowBatch(columnsToIncludeTruncated);
+ conf.setVectorizedRowBatch(vectorizedInputFileFormatBatch);
+
+ /*
+ * This batch is used by vector/row deserializer readers.
+ */
+ deserializerBatch = batchContext.createVectorizedRowBatch(columnsToIncludeTruncated);
+
+ batchCounter = 0;
+
+ dataColumnCount = batchContext.getDataColumnCount();
+ partitionColumnCount = batchContext.getPartitionColumnCount();
+ partitionValues = new Object[partitionColumnCount];
+
+ /*
+ * Create table related objects
+ */
+ tableStructTypeInfo =
+ TypeInfoFactory.getStructTypeInfo(
+ Arrays.asList(batchContext.getRowColumnNames()),
+ Arrays.asList(batchContext.getRowColumnTypeInfos()));
+ tableStandardStructObjectInspector =
+ (StandardStructObjectInspector)
+ TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(tableStructTypeInfo);
+
+ tableRowTypeInfos = batchContext.getRowColumnTypeInfos();
+
+ /*
+ * The Vectorizer class enforces that there is only one TableScanOperator, so
+ * we don't need the more complicated multiple root operator mapping that MapOperator has.
+ */
+ fileToPartitionContextMap = new HashMap<String, VectorPartitionContext>();
+
+ // Temporary map so we only create one partition context entry.
+ HashMap<PartitionDesc, VectorPartitionContext> partitionContextMap =
+ new HashMap<PartitionDesc, VectorPartitionContext>();
+
+ for (Map.Entry<String, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) {
+ String path = entry.getKey();
+ PartitionDesc partDesc = conf.getPathToPartitionInfo().get(path);
+ ArrayList<String> aliases = entry.getValue();
+
+ VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+ LOG.info("VectorMapOperator path: " + path +
+ ", read type " + vectorPartDesc.getVectorMapOperatorReadType().name() +
+ ", vector deserialize type " + vectorPartDesc.getVectorDeserializeType().name() +
+ ", aliases " + aliases);
+
+ VectorPartitionContext vectorPartitionContext;
+ if (!partitionContextMap.containsKey(partDesc)) {
+ vectorPartitionContext = createAndInitPartitionContext(partDesc, hconf);
+ partitionContextMap.put(partDesc, vectorPartitionContext);
+ } else {
+ vectorPartitionContext = partitionContextMap.get(partDesc);
+ }
+
+ fileToPartitionContextMap.put(path, vectorPartitionContext);
+ }
+
+ // Create list of one.
+ List<Operator<? extends OperatorDesc>> children =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ children.add(oneRootOperator);
+
+ setChildOperators(children);
+ }
+
+ @Override
+ public void initializeMapOperator(Configuration hconf) throws HiveException {
+ super.initializeMapOperator(hconf);
+
+ oneRootOperator.initialize(hconf, new ObjectInspector[] {tableStandardStructObjectInspector});
+ }
+
+ public void initializeContexts() throws HiveException {
+ Path fpath = getExecContext().getCurrentInputPath();
+ String nominalPath = getNominalPath(fpath);
+ setupPartitionContextVars(nominalPath);
+ }
+
+ // Find context for current input file
+ @Override
+ public void cleanUpInputFileChangedOp() throws HiveException {
+ super.cleanUpInputFileChangedOp();
+ Path fpath = getExecContext().getCurrentInputPath();
+ String nominalPath = getNominalPath(fpath);
+
+ setupPartitionContextVars(nominalPath);
+
+ // Add alias, table name, and partitions to hadoop conf so that their
+ // children will inherit these
+ oneRootOperator.setInputContext(currentVectorPartContext.tableName,
+ currentVectorPartContext.partName);
+ }
+
+ /*
+ * Setup the context for reading from the next partition file.
+ */
+ private void setupPartitionContextVars(String nominalPath) throws HiveException {
+
+ currentVectorPartContext = fileToPartitionContextMap.get(nominalPath);
+ PartitionDesc partDesc = currentVectorPartContext.getPartDesc();
+ VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+ currentReadType = vectorPartDesc.getVectorMapOperatorReadType();
+
+ /*
+ * Setup for 3 different kinds of vectorized reading supported:
+ *
+ * 1) Read the Vectorized Input File Format which returns VectorizedRowBatch as the row.
+ *
+ * 2) Read using VectorDeserializeRow to deserialize each row into the VectorizedRowBatch.
+ *
+ * 3) And read using the regular partition deserializer to get the row object and assigning
+ * the row object into the VectorizedRowBatch with VectorAssignRow.
+ */
+ if (currentReadType == VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) {
+
+ /*
+ * The Vectorized Input File Format reader is responsible for setting the partition column
+ * values, resetting and filling in the batch, etc.
+ */
+
+ /*
+ * Clear all the reading variables.
+ */
+ currentDataColumnCount = 0;
+
+ currentDeserializeRead = null;
+ currentVectorDeserializeRow = null;
+
+ currentPartDeserializer = null;
+ currentPartRawRowObjectInspector = null;
+ currentVectorAssign = null;
+
+ } else {
+
+ /*
+ * We will get "regular" single rows from the Input File Format reader that we will need
+ * to {vector|row} deserialize.
+ */
+ Preconditions.checkState(
+ currentReadType == VectorMapOperatorReadType.VECTOR_DESERIALIZE ||
+ currentReadType == VectorMapOperatorReadType.ROW_DESERIALIZE);
+
+ if (deserializerBatch.size > 0) {
+
+ /*
+ * Clear out any rows in the batch from previous partition since we are going to change
+ * the repeating partition column values.
+ */
+ batchCounter++;
+ oneRootOperator.process(deserializerBatch, 0);
+ deserializerBatch.reset();
+ if (oneRootOperator.getDone()) {
+ setDone(true);
+ return;
+ }
+
+ }
+
+ /*
+ * For this particular file, how many columns will we actually read?
+ */
+ currentDataColumnCount = currentVectorPartContext.getReaderDataColumnCount();
+
+ if (currentDataColumnCount < dataColumnCount) {
+
+ /*
+ * Default any additional data columns to NULL once for the file.
+ */
+ for (int i = currentDataColumnCount; i < dataColumnCount; i++) {
+ ColumnVector colVector = deserializerBatch.cols[i];
+ colVector.isNull[0] = true;
+ colVector.noNulls = false;
+ colVector.isRepeating = true;
+ }
+ }
+
+ if (batchContext.getPartitionColumnCount() > 0) {
+
+ /*
+ * The partition columns are set once for the partition and are marked repeating.
+ */
+ VectorizedRowBatchCtx.getPartitionValues(batchContext, partDesc, partitionValues);
+ batchContext.addPartitionColsToBatch(deserializerBatch, partitionValues);
+ }
+
+ /*
+ * Set or clear the rest of the reading variables based on {vector|row} deserialization.
+ */
+ switch (currentReadType) {
+ case VECTOR_DESERIALIZE:
+ {
+ VectorDeserializePartitionContext vectorDeserPartContext =
+ (VectorDeserializePartitionContext) currentVectorPartContext;
+
+ // Set ours.
+ currentDeserializeRead = vectorDeserPartContext.getDeserializeRead();
+ currentVectorDeserializeRow = vectorDeserPartContext.getVectorDeserializeRow();
+
+ // Clear the other ones.
+ currentPartDeserializer = null;
+ currentPartRawRowObjectInspector = null;
+ currentVectorAssign = null;
+
+ }
+ break;
+
+ case ROW_DESERIALIZE:
+ {
+ RowDeserializePartitionContext rowDeserPartContext =
+ (RowDeserializePartitionContext) currentVectorPartContext;
+
+ // Clear the other ones.
+ currentDeserializeRead = null;
+ currentVectorDeserializeRow = null;
+
+ // Set ours.
+ currentPartDeserializer = rowDeserPartContext.getPartDeserializer();
+ currentPartRawRowObjectInspector = rowDeserPartContext.getPartRawRowObjectInspector();
+ currentVectorAssign = rowDeserPartContext.getVectorAssign();
+ }
+ break;
+
+ default:
+ throw new RuntimeException("Unexpected VectorMapOperator read type " +
+ currentReadType.name());
+ }
+ }
+ }
+
+ @Override
+ public Deserializer getCurrentDeserializer() {
+ // Not applicable.
+ return null;
+ }
+
@Override
public void process(Writable value) throws HiveException {
+
// A mapper can span multiple files/partitions.
- // The serializers need to be reset if the input file changed
+ // The VectorPartitionContext need to be changed if the input file changed
ExecMapperContext context = getExecContext();
if (context != null && context.inputFileChanged()) {
// The child operators cleanup if input file has changed
cleanUpInputFileChanged();
}
- // The row has been converted to comply with table schema, irrespective of partition schema.
- // So, use tblOI (and not partOI) for forwarding
- try {
- int childrenDone = 0;
- for (MapOpCtx current : currentCtxs) {
- if (!current.forward(value)) {
- childrenDone++;
+ if (!oneRootOperator.getDone()) {
+
+ /*
+ * 3 different kinds of vectorized reading supported:
+ *
+ * 1) Read the Vectorized Input File Format which returns VectorizedRowBatch as the row.
+ *
+ * 2) Read using VectorDeserializeRow to deserialize each row into the VectorizedRowBatch.
+ *
+ * 3) And read using the regular partition deserializer to get the row object and assigning
+ * the row object into the VectorizedRowBatch with VectorAssignRow.
+ */
+ try {
+ if (currentReadType == VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) {
+
+ /*
+ * The Vectorized Input File Format reader has already set the partition column
+ * values, reset and filled in the batch, etc.
+ *
+ * We pass the VectorizedRowBatch through here.
+ */
+ batchCounter++;
+ oneRootOperator.process(value, 0);
+ if (oneRootOperator.getDone()) {
+ setDone(true);
+ return;
+ }
+
+ } else {
+
+ /*
+ * We have a "regular" single rows from the Input File Format reader that we will need
+ * to deserialize.
+ */
+ Preconditions.checkState(
+ currentReadType == VectorMapOperatorReadType.VECTOR_DESERIALIZE ||
+ currentReadType == VectorMapOperatorReadType.ROW_DESERIALIZE);
+
+ if (deserializerBatch.size == deserializerBatch.DEFAULT_SIZE) {
+
+ /*
+ * Feed current full batch to operator tree.
+ */
+ batchCounter++;
+ oneRootOperator.process(deserializerBatch, 0);
+
+ /**
+ * Only reset the current data columns. Not any data columns defaulted to NULL
+ * because they are not present in the partition, and not partition columns.
+ */
+ for (int c = 0; c < currentDataColumnCount; c++) {
+ deserializerBatch.cols[c].reset();
+ deserializerBatch.cols[c].init();
+ }
+ deserializerBatch.selectedInUse = false;
+ deserializerBatch.size = 0;
+ deserializerBatch.endOfFile = false;
+
+ if (oneRootOperator.getDone()) {
+ setDone(true);
+ return;
+ }
+ }
+
+ /*
+ * Do the {vector|row} deserialization of the one row into the VectorizedRowBatch.
+ */
+ switch (currentReadType) {
+ case VECTOR_DESERIALIZE:
+ {
+ BinaryComparable binComp = (BinaryComparable) value;
+ currentDeserializeRead.set(binComp.getBytes(), 0, binComp.getLength());
+
+ // Deserialize and append new row using the current batch size as the index.
+ currentVectorDeserializeRow.deserialize(deserializerBatch, deserializerBatch.size++);
+ }
+ break;
+
+ case ROW_DESERIALIZE:
+ {
+ Object deserialized = currentPartDeserializer.deserialize(value);
+
+ // Note: Regardless of what the Input File Format returns, we have determined
+ // with VectorAppendRow.initConversion that only currentDataColumnCount columns
+ // have values we want.
+ //
+ // Any extra columns needed by the table schema were set to repeating null
+ // in the batch by setupPartitionContextVars.
+
+ // Convert input row to standard objects.
+ List<Object> standardObjects = new ArrayList<Object>();
+ ObjectInspectorUtils.copyToStandardObject(standardObjects, deserialized,
+ currentPartRawRowObjectInspector, ObjectInspectorCopyOption.WRITABLE);
+ if (standardObjects.size() < currentDataColumnCount) {
+ throw new HiveException("Input File Format returned row with too few columns");
+ }
+
+ // Append the deserialized standard object row using the current batch size
+ // as the index.
+ currentVectorAssign.assignRow(deserializerBatch, deserializerBatch.size++,
+ standardObjects, currentDataColumnCount);
+ }
+ break;
+
+ default:
+ throw new RuntimeException("Unexpected vector MapOperator read type " +
+ currentReadType.name());
+ }
}
+ } catch (Exception e) {
+ throw new HiveException("Hive Runtime Error while processing row ", e);
}
+ }
+ }
+
+ @Override
+ public void process(Object row, int tag) throws HiveException {
+ throw new HiveException("Hive 2 Internal error: should not be called!");
+ }
- rowsForwarded(childrenDone, ((VectorizedRowBatch)value).size);
- } catch (Exception e) {
- throw new HiveException("Hive Runtime Error while processing row ", e);
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ if (!abort && oneRootOperator != null && !oneRootOperator.getDone() &&
+ currentReadType != VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) {
+ if (deserializerBatch.size > 0) {
+ batchCounter++;
+ oneRootOperator.process(deserializerBatch, 0);
+ deserializerBatch.size = 0;
+ }
}
+ super.closeOp(abort);
+ }
+
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "MAP";
+ }
+
+ @Override
+ public OperatorType getType() {
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
index 74e5130..dd5e20f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
@@ -40,7 +40,7 @@ public class VectorReduceSinkOperator extends ReduceSinkOperator {
private transient boolean firstBatch;
- private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+ private transient VectorExtractRow vectorExtractRow;
protected transient Object[] singleRow;
@@ -81,32 +81,28 @@ public class VectorReduceSinkOperator extends ReduceSinkOperator {
VectorizedRowBatch batch = (VectorizedRowBatch) data;
if (firstBatch) {
- vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
- vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
+ vectorExtractRow = new VectorExtractRow();
+ vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
- singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+ singleRow = new Object[vectorExtractRow.getCount()];
firstBatch = false;
}
- vectorExtractRowDynBatch.setBatchOnEntry(batch);
-
// VectorizedBatchUtil.debugDisplayBatch( batch, "VectorReduceSinkOperator processOp ");
if (batch.selectedInUse) {
int selected[] = batch.selected;
for (int logical = 0 ; logical < batch.size; logical++) {
int batchIndex = selected[logical];
- vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ vectorExtractRow.extractRow(batch, batchIndex, singleRow);
super.process(singleRow, tag);
}
} else {
for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
- vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ vectorExtractRow.extractRow(batch, batchIndex, singleRow);
super.process(singleRow, tag);
}
}
-
- vectorExtractRowDynBatch.forgetBatchOnExit();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
index 85c8506..59153c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
@@ -76,7 +76,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
private transient VectorHashKeyWrapperBatch keyWrapperBatch;
- private transient Map<ObjectInspector, VectorAssignRowSameBatch> outputVectorAssignRowMap;
+ private transient Map<ObjectInspector, VectorAssignRow> outputVectorAssignRowMap;
private transient int batchIndex = -1;
@@ -159,7 +159,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
keyWrapperBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
- outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRowSameBatch>();
+ outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRow>();
// This key evaluator translates from the vectorized VectorHashKeyWrapper format
// into the row-mode MapJoinKey
@@ -287,15 +287,14 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
@Override
protected void internalForward(Object row, ObjectInspector outputOI) throws HiveException {
Object[] values = (Object[]) row;
- VectorAssignRowSameBatch va = outputVectorAssignRowMap.get(outputOI);
+ VectorAssignRow va = outputVectorAssignRowMap.get(outputOI);
if (va == null) {
- va = new VectorAssignRowSameBatch();
+ va = new VectorAssignRow();
va.init((StructObjectInspector) outputOI, vOutContext.getProjectedColumns());
- va.setOneBatch(outputBatch);
outputVectorAssignRowMap.put(outputOI, va);
}
- va.assignRow(outputBatch.size, values);
+ va.assignRow(outputBatch, outputBatch.size, values);
++outputBatch.size;
if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
index 3d0b571..51d1436 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
@@ -46,7 +46,7 @@ public class VectorSparkHashTableSinkOperator extends SparkHashTableSinkOperator
private transient boolean firstBatch;
- private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+ private transient VectorExtractRow vectorExtractRow;
protected transient Object[] singleRow;
@@ -82,28 +82,26 @@ public class VectorSparkHashTableSinkOperator extends SparkHashTableSinkOperator
VectorizedRowBatch batch = (VectorizedRowBatch) row;
if (firstBatch) {
- vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
- vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
+ vectorExtractRow = new VectorExtractRow();
+ vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
- singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+ singleRow = new Object[vectorExtractRow.getCount()];
firstBatch = false;
}
- vectorExtractRowDynBatch.setBatchOnEntry(batch);
+
if (batch.selectedInUse) {
int selected[] = batch.selected;
for (int logical = 0 ; logical < batch.size; logical++) {
int batchIndex = selected[logical];
- vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ vectorExtractRow.extractRow(batch, batchIndex, singleRow);
super.process(singleRow, tag);
}
} else {
for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
- vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ vectorExtractRow.extractRow(batch, batchIndex, singleRow);
super.process(singleRow, tag);
}
}
-
- vectorExtractRowDynBatch.forgetBatchOnExit();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
index e7ac531..2dc4d0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
@@ -42,7 +42,7 @@ public class VectorSparkPartitionPruningSinkOperator extends SparkPartitionPruni
protected transient boolean firstBatch;
- protected transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+ protected transient VectorExtractRow vectorExtractRow;
protected transient Object[] singleRow;
@@ -77,27 +77,24 @@ public class VectorSparkPartitionPruningSinkOperator extends SparkPartitionPruni
public void process(Object data, int tag) throws HiveException {
VectorizedRowBatch batch = (VectorizedRowBatch) data;
if (firstBatch) {
- vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
- vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0],
+ vectorExtractRow = new VectorExtractRow();
+ vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0],
vContext.getProjectedColumns());
- singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+ singleRow = new Object[vectorExtractRow.getCount()];
firstBatch = false;
}
- vectorExtractRowDynBatch.setBatchOnEntry(batch);
ObjectInspector rowInspector = inputObjInspectors[0];
try {
Writable writableRow;
for (int logical = 0; logical < batch.size; logical++) {
int batchIndex = batch.selectedInUse ? batch.selected[logical] : logical;
- vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ vectorExtractRow.extractRow(batch, batchIndex, singleRow);
writableRow = serializer.serialize(singleRow, rowInspector);
writableRow.write(buffer);
}
} catch (Exception e) {
throw new HiveException(e);
}
-
- vectorExtractRowDynBatch.forgetBatchOnExit();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index 86025ef..5c55011 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -68,13 +68,11 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUD
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxLong;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxString;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxTimestamp;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxIntervalDayTime;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinDecimal;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinDouble;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinLong;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinString;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinTimestamp;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinIntervalDayTime;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopDecimal;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopDouble;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopLong;
@@ -171,10 +169,6 @@ public class VectorizationContext {
public VectorizationContext(String contextName, List<String> initialColumnNames) {
this.contextName = contextName;
level = 0;
- if (LOG.isDebugEnabled()) {
- LOG.debug("VectorizationContext consructor contextName " + contextName + " level "
- + level + " initialColumnNames " + initialColumnNames);
- }
this.initialColumnNames = initialColumnNames;
this.projectionColumnNames = initialColumnNames;
@@ -195,9 +189,6 @@ public class VectorizationContext {
public VectorizationContext(String contextName) {
this.contextName = contextName;
level = 0;
- if (LOG.isDebugEnabled()) {
- LOG.debug("VectorizationContext consructor contextName " + contextName + " level " + level);
- }
initialColumnNames = new ArrayList<String>();
projectedColumns = new ArrayList<Integer>();
projectionColumnNames = new ArrayList<String>();
@@ -213,7 +204,6 @@ public class VectorizationContext {
public VectorizationContext(String contextName, VectorizationContext vContext) {
this.contextName = contextName;
level = vContext.level + 1;
- LOG.info("VectorizationContext consructor reference contextName " + contextName + " level " + level);
this.initialColumnNames = vContext.initialColumnNames;
this.projectedColumns = new ArrayList<Integer>();
this.projectionColumnNames = new ArrayList<String>();
@@ -485,7 +475,7 @@ public class VectorizationContext {
throw new HiveException("Could not vectorize expression: "+exprDesc.getName());
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Input Expression = " + exprDesc.getTypeInfo()
+ LOG.debug("Input Expression = " + exprDesc.toString()
+ ", Vectorized Expression = " + ve.toString());
}
return ve;
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index be04da8..9471e66 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.vector;
import java.io.IOException;
+import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
@@ -28,8 +29,11 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
import org.apache.hadoop.hive.common.type.HiveVarchar;
@@ -49,12 +53,14 @@ import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -63,6 +69,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -70,6 +77,7 @@ import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hive.common.util.DateUtils;
public class VectorizedBatchUtil {
@@ -638,6 +646,47 @@ public class VectorizedBatchUtil {
return newBatch;
}
+ public static Writable getPrimitiveWritable(PrimitiveCategory primitiveCategory) {
+ switch (primitiveCategory) {
+ case VOID:
+ return null;
+ case BOOLEAN:
+ return new BooleanWritable(false);
+ case BYTE:
+ return new ByteWritable((byte) 0);
+ case SHORT:
+ return new ShortWritable((short) 0);
+ case INT:
+ return new IntWritable(0);
+ case LONG:
+ return new LongWritable(0);
+ case TIMESTAMP:
+ return new TimestampWritable(new Timestamp(0));
+ case DATE:
+ return new DateWritable(new Date(0));
+ case FLOAT:
+ return new FloatWritable(0);
+ case DOUBLE:
+ return new DoubleWritable(0);
+ case BINARY:
+ return new BytesWritable(ArrayUtils.EMPTY_BYTE_ARRAY);
+ case STRING:
+ return new Text(ArrayUtils.EMPTY_BYTE_ARRAY);
+ case VARCHAR:
+ return new HiveVarcharWritable(new HiveVarchar(StringUtils.EMPTY, -1));
+ case CHAR:
+ return new HiveCharWritable(new HiveChar(StringUtils.EMPTY, -1));
+ case DECIMAL:
+ return new HiveDecimalWritable();
+ case INTERVAL_YEAR_MONTH:
+ return new HiveIntervalYearMonthWritable();
+ case INTERVAL_DAY_TIME:
+ return new HiveIntervalDayTimeWritable();
+ default:
+ throw new RuntimeException("Primitive category " + primitiveCategory.name() + " not supported");
+ }
+ }
+
public static String displayBytes(byte[] bytes, int start, int length) {
StringBuilder sb = new StringBuilder();
for (int i = start; i < start + length; i++) {
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 5cbace4..6a3d64b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -186,7 +186,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
int length = byteSegmentRef.getLength();
smallTableVectorDeserializeRow.setBytes(bytes, offset, length);
- smallTableVectorDeserializeRow.deserializeByValue(batch, batchIndex);
+ smallTableVectorDeserializeRow.deserialize(batch, batchIndex);
}
// VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, "generateHashMapResultSingleValue big table");
@@ -253,7 +253,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
int length = byteSegmentRef.getLength();
smallTableVectorDeserializeRow.setBytes(bytes, offset, length);
- smallTableVectorDeserializeRow.deserializeByValue(overflowBatch, overflowBatch.size);
+ smallTableVectorDeserializeRow.deserialize(overflowBatch, overflowBatch.size);
}
// VectorizedBatchUtil.debugDisplayOneRow(overflowBatch, overflowBatch.size, "generateHashMapResultMultiValue overflow");
@@ -304,7 +304,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
int length = byteSegmentRef.getLength();
smallTableVectorDeserializeRow.setBytes(bytes, offset, length);
- smallTableVectorDeserializeRow.deserializeByValue(overflowBatch, overflowBatch.DEFAULT_SIZE);
+ smallTableVectorDeserializeRow.deserialize(overflowBatch, overflowBatch.DEFAULT_SIZE);
}
overflowBatch.size++;
@@ -545,7 +545,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
// LOG.debug(CLASS_NAME + " reProcessBigTable serialized row #" + rowCount + ", offset " + offset + ", length " + length);
bigTableVectorDeserializeRow.setBytes(bytes, offset, length);
- bigTableVectorDeserializeRow.deserializeByValue(spillReplayBatch, spillReplayBatch.size);
+ bigTableVectorDeserializeRow.deserialize(spillReplayBatch, spillReplayBatch.size);
spillReplayBatch.size++;
if (spillReplayBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
index 1877f14..d02d1db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
@@ -30,19 +30,19 @@ public class VectorMapJoinFastLongHashUtil {
long key = 0;
switch (hashTableKeyType) {
case BOOLEAN:
- key = (keyBinarySortableDeserializeRead.readBoolean() ? 1 : 0);
+ key = (keyBinarySortableDeserializeRead.currentBoolean ? 1 : 0);
break;
case BYTE:
- key = (long) keyBinarySortableDeserializeRead.readByte();
+ key = (long) keyBinarySortableDeserializeRead.currentByte;
break;
case SHORT:
- key = (long) keyBinarySortableDeserializeRead.readShort();
+ key = (long) keyBinarySortableDeserializeRead.currentShort;
break;
case INT:
- key = (long) keyBinarySortableDeserializeRead.readInt();
+ key = (long) keyBinarySortableDeserializeRead.currentInt;
break;
case LONG:
- key = keyBinarySortableDeserializeRead.readLong();
+ key = keyBinarySortableDeserializeRead.currentLong;
break;
default:
throw new RuntimeException("Unexpected hash table key type " + hashTableKeyType.name());
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
index adb8044..985fb1c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
-import org.apache.hadoop.hive.serde2.fast.DeserializeRead.ReadStringResults;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.BytesWritable;
@@ -40,8 +39,6 @@ public class VectorMapJoinFastStringCommon {
private BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
- private ReadStringResults readStringResults;
-
public void adaptPutRow(VectorMapJoinFastBytesHashTable hashTable,
BytesWritable currentKey, BytesWritable currentValue) throws HiveException, IOException {
@@ -51,9 +48,11 @@ public class VectorMapJoinFastStringCommon {
if (keyBinarySortableDeserializeRead.readCheckNull()) {
return;
}
- keyBinarySortableDeserializeRead.readString(readStringResults);
- hashTable.add(readStringResults.bytes, readStringResults.start, readStringResults.length,
+ hashTable.add(
+ keyBinarySortableDeserializeRead.currentBytes,
+ keyBinarySortableDeserializeRead.currentBytesStart,
+ keyBinarySortableDeserializeRead.currentBytesLength,
currentValue);
}
@@ -61,6 +60,5 @@ public class VectorMapJoinFastStringCommon {
this.isOuterJoin = isOuterJoin;
PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
keyBinarySortableDeserializeRead = new BinarySortableDeserializeRead(primitiveTypeInfos);
- readStringResults = keyBinarySortableDeserializeRead.createReadStringResults();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 46a5413..cfedf35 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -205,8 +205,8 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon())) {
return inputFormat; // LLAP not enabled, no-op.
}
- boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface,
- isVectorized = Utilities.isVectorMode(conf);
+ boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface;
+ boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(conf);
if (!isSupported || !isVectorized) {
LOG.info("Not using llap for " + inputFormat + ": supported = " + isSupported
+ ", vectorized = " + isVectorized);
@@ -225,7 +225,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
public static boolean canWrapAnyForLlap(Configuration conf, MapWork mapWork) {
- return Utilities.isVectorMode(conf, mapWork);
+ return Utilities.getUseVectorizedInputFileFormat(conf, mapWork);
}
public static boolean canWrapForLlap(Class<? extends InputFormat> inputFormatClass) {
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
index c53d149..80858a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java
@@ -72,7 +72,7 @@ public class NullRowsInputFormat implements InputFormat<NullWritable, NullWritab
private boolean addPartitionCols = true;
public NullRowsRecordReader(Configuration conf, InputSplit split) throws IOException {
- boolean isVectorMode = Utilities.isVectorMode(conf);
+ boolean isVectorMode = Utilities.getUseVectorizedInputFileFormat(conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Null record reader in " + (isVectorMode ? "" : "non-") + "vector mode");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index fcb8ca4..33fe3b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -452,7 +452,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<FileStatus> files
) throws IOException {
- if (Utilities.isVectorMode(conf)) {
+ if (Utilities.getUseVectorizedInputFileFormat(conf)) {
return new VectorizedOrcInputFormat().validateInput(fs, conf, files);
}
@@ -1640,7 +1640,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
getRecordReader(InputSplit inputSplit, JobConf conf,
Reporter reporter) throws IOException {
- boolean vectorMode = Utilities.isVectorMode(conf);
+ boolean vectorMode = Utilities.getUseVectorizedInputFileFormat(conf);
boolean isAcidRead = isAcidRead(conf, inputSplit);
if (!isAcidRead) {
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
index a4e35cb..5b65e5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
@@ -59,7 +59,7 @@ public class MapredParquetInputFormat extends FileInputFormat<NullWritable, Arra
final org.apache.hadoop.mapred.Reporter reporter
) throws IOException {
try {
- if (Utilities.isVectorMode(job)) {
+ if (Utilities.getUseVectorizedInputFileFormat(job)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using vectorized record reader");
}