You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/06/07 03:39:38 UTC
svn commit: r1490492 - in
/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql:
exec/vector/ io/orc/
Author: hashutosh
Date: Fri Jun 7 01:39:37 2013
New Revision: 1490492
URL: http://svn.apache.org/r1490492
Log:
HIVE-4673 : Use VectorExpessionWriter to write column vectors into Writables. (Jitendra Nath Pandey via Ashutosh Chauhan)
Modified:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Fri Jun 7 01:39:37 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Jo
import org.apache.hadoop.hive.ql.exec.Stat;
import org.apache.hadoop.hive.ql.exec.TerminalOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -331,7 +332,6 @@ public class VectorFileSinkOperator exte
statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
- System.out.println("Deserializer class = "+serializer.getClass().toString());
serializer.initialize(null, conf.getTableInfo().getProperties());
outputClass = serializer.getSerializedClass();
@@ -589,6 +589,10 @@ public class VectorFileSinkOperator exte
if (vectorizedSerde) {
row = records[i];
} else {
+ if (vrg.valueWriters == null) {
+ vrg.setValueWriters(VectorExpressionWriterFactory.getExpressionWriters(
+ (StructObjectInspector)inputObjInspectors[0]));
+ }
row = new Text(vrg.toString());
}
/* Create list bucketing sub-directory only if stored-as-directories is on. */
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java Fri Jun 7 01:39:37 2013
@@ -19,16 +19,21 @@
package org.apache.hadoop.hive.ql.exec.vector;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
/**
* Select operator implementation.
@@ -44,6 +49,8 @@ public class VectorSelectOperator extend
private int [] projectedColumns = null;
+ private VectorExpressionWriter [] valueWriters = null;
+
public VectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) {
this.vContext = ctxt;
this.conf = (SelectDesc) conf;
@@ -57,6 +64,8 @@ public class VectorSelectOperator extend
return;
}
+ List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
+
List<ExprNodeDesc> colList = conf.getColList();
vContext.setOperatorType(OperatorType.SELECT);
vExpressions = new VectorExpression[colList.size()];
@@ -66,6 +75,15 @@ public class VectorSelectOperator extend
// Update column map with output column names
vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn());
}
+ valueWriters = VectorExpressionWriterFactory.getExpressionWriters(colList);
+ for (VectorExpressionWriter vew : valueWriters) {
+ objectInspectors.add(vew.getObjectInspector());
+ }
+
+ List<String> outputFieldNames = conf.getOutputColumnNames();
+ outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ outputFieldNames, objectInspectors);
+
initializeChildren(hconf);
projectedColumns = new int [vExpressions.length];
for (int i = 0; i < projectedColumns.length; i++) {
@@ -97,6 +115,8 @@ public class VectorSelectOperator extend
}
// Prepare output, set the projections
+ VectorExpressionWriter [] originalValueWriters = vrg.valueWriters;
+ vrg.setValueWriters(valueWriters);
int[] originalProjections = vrg.projectedColumns;
int originalProjectionSize = vrg.projectionSize;
vrg.projectionSize = vExpressions.length;
@@ -106,6 +126,7 @@ public class VectorSelectOperator extend
// Revert the projected columns back, because vrg will be re-used.
vrg.projectionSize = originalProjectionSize;
vrg.projectedColumns = originalProjections;
+ vrg.valueWriters = originalValueWriters;
}
/**
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java Fri Jun 7 01:39:37 2013
@@ -21,6 +21,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.Writable;
/**
@@ -54,8 +56,7 @@ public class VectorizedRowBatch implemen
*/
public static final int DEFAULT_SIZE = 1024;
- private final Writable[] writableRow;
- private int rowIteratorIndex = 0;
+ public VectorExpressionWriter[] valueWriters = null;
/**
* Return a batch with the specified number of columns.
@@ -81,7 +82,6 @@ public class VectorizedRowBatch implemen
selected = new int[size];
selectedInUse = false;
this.cols = new ColumnVector[numCols];
- writableRow = new Writable[numCols];
projectedColumns = new int[numCols];
// Initially all columns are projected and in the same order
@@ -91,30 +91,6 @@ public class VectorizedRowBatch implemen
}
}
- public void initRowIterator(){
- this.rowIteratorIndex = 0;
- }
-
- public Writable [] getNextRow() {
- if (rowIteratorIndex >= size) {
- return null;
- }
- if (selectedInUse) {
- int i = selected[rowIteratorIndex];
- for (int k = 0; k < projectionSize; k++) {
- int c = this.projectedColumns[k];
- writableRow[c] = cols[c].getWritableObject(i);
- }
- } else {
- int i = rowIteratorIndex;
- for (int k = 0; k < projectionSize; k++) {
- int c = this.projectedColumns[k];
- writableRow[c] = cols[c].getWritableObject(i);
- }
- }
- return writableRow;
- }
-
/**
* Return count of qualifying rows.
*
@@ -130,45 +106,51 @@ public class VectorizedRowBatch implemen
return "";
}
StringBuilder b = new StringBuilder();
- if (this.selectedInUse) {
- for (int j = 0; j < size; j++) {
- int i = selected[j];
- int colIndex = 0;
- for (int k = 0; k < projectionSize; k++) {
- ColumnVector cv = cols[this.projectedColumns[k]];
- if (cv.isRepeating) {
- b.append(cv.getWritableObject(0).toString());
- } else {
- b.append(cv.getWritableObject(i).toString());
+ try {
+ if (this.selectedInUse) {
+ for (int j = 0; j < size; j++) {
+ int i = selected[j];
+ int colIndex = 0;
+ for (int k = 0; k < projectionSize; k++) {
+ int projIndex = projectedColumns[k];
+ ColumnVector cv = cols[projIndex];
+ if (cv.isRepeating) {
+ b.append(valueWriters[k].writeValue(cv, 0).toString());
+ } else {
+ b.append(valueWriters[k].writeValue(cv, i).toString());
+ }
+ colIndex++;
+ if (colIndex < cols.length) {
+ b.append('\u0001');
+ }
}
- colIndex++;
- if (colIndex < cols.length) {
- b.append('\u0001');
+ if (j < size - 1) {
+ b.append('\n');
}
}
- if (j < size-1) {
- b.append('\n');
- }
- }
- } else {
- for (int i = 0; i < size; i++) {
- int colIndex = 0;
- for (int k = 0; k < projectionSize; k++) {
- ColumnVector cv = cols[this.projectedColumns[k]];
- if (cv.isRepeating) {
- b.append(cv.getWritableObject(0).toString());
- } else {
- b.append(cv.getWritableObject(i).toString());
+ } else {
+ for (int i = 0; i < size; i++) {
+ int colIndex = 0;
+ for (int k = 0; k < projectionSize; k++) {
+ int projIndex = projectedColumns[k];
+ ColumnVector cv = cols[projIndex];
+ if (cv.isRepeating) {
+ b.append(valueWriters[k].writeValue(cv, 0).toString());
+ } else {
+ b.append(valueWriters[k].writeValue(cv, i).toString());
+ }
+ colIndex++;
+ if (colIndex < cols.length) {
+ b.append('\u0001');
+ }
}
- colIndex++;
- if (colIndex < cols.length) {
- b.append('\u0001');
+ if (i < size - 1) {
+ b.append('\n');
}
}
- if (i < size-1) {
- b.append('\n');
- }
}
+ } catch (HiveException ex) {
+ throw new RuntimeException(ex);
}
return b.toString();
}
@@ -182,5 +164,9 @@ public class VectorizedRowBatch implemen
public void write(DataOutput arg0) throws IOException {
throw new UnsupportedOperationException("Don't call me");
}
+
+ public void setValueWriters(VectorExpressionWriter[] valueWriters) {
+ this.valueWriters = valueWriters;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Fri Jun 7 01:39:37 2013
@@ -139,7 +139,7 @@ public class OrcSerde implements SerDe,
public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
throws SerDeException {
if (vos == null) {
- vos = new VectorizedOrcSerde();
+ vos = new VectorizedOrcSerde(objInspector);
}
return vos.serialize(vrg, objInspector);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Fri Jun 7 01:39:37 2013
@@ -552,7 +552,7 @@ class RecordReaderImpl implements Record
// Read value entries based on isNull entries
for (int i = 0; i < batchSize; i++) {
if (!result.isNull[i]) {
- result.vector[i] = SerializationUtils.readDouble(stream);
+ result.vector[i] = SerializationUtils.readFloat(stream);
} else {
// If the value is not present then set NaN
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java?rev=1490492&r1=1490491&r2=1490492&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java Fri Jun 7 01:39:37 2013
@@ -18,7 +18,11 @@
package org.apache.hadoop.hive.ql.io.orc;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
@@ -31,43 +35,54 @@ public class VectorizedOrcSerde extends
private final Writable [] orcRowArray = new Writable [VectorizedRowBatch.DEFAULT_SIZE];
private final ObjectWritable ow = new ObjectWritable();
private final ObjectInspector inspector = null;
+ private final VectorExpressionWriter [] valueWriters;
- public VectorizedOrcSerde() {
+ public VectorizedOrcSerde(ObjectInspector objInspector) {
super();
for (int i = 0; i < orcStructArray.length; i++) {
orcRowArray[i] = new OrcSerdeRow();
}
+ try {
+ valueWriters = VectorExpressionWriterFactory
+ .getExpressionWriters((StructObjectInspector) objInspector);
+ } catch (HiveException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
public Writable serialize(Object obj, ObjectInspector inspector) {
- VectorizedRowBatch batch = (VectorizedRowBatch)obj;
- for (int i = 0; i < batch.size; i++) {
- OrcStruct ost = orcStructArray[i];
- if (ost == null) {
- ost = new OrcStruct(batch.numCols);
- orcStructArray[i] = ost;
- }
- int index = 0;
- if (batch.selectedInUse) {
- index = batch.selected[i];
- } else {
- index = i;
- }
- for (int p = 0; p < batch.projectionSize; p++) {
- int k = batch.projectedColumns[p];
- Writable w;
- if (batch.cols[k].isRepeating) {
- w = batch.cols[k].getWritableObject(0);
+ VectorizedRowBatch batch = (VectorizedRowBatch) obj;
+ try {
+ for (int i = 0; i < batch.size; i++) {
+ OrcStruct ost = orcStructArray[i];
+ if (ost == null) {
+ ost = new OrcStruct(batch.numCols);
+ orcStructArray[i] = ost;
+ }
+ int index = 0;
+ if (batch.selectedInUse) {
+ index = batch.selected[i];
} else {
- w = batch.cols[k].getWritableObject(index);
+ index = i;
+ }
+ for (int p = 0; p < batch.projectionSize; p++) {
+ int k = batch.projectedColumns[p];
+ Writable w;
+ if (batch.cols[k].isRepeating) {
+ w = (Writable) valueWriters[p].writeValue(batch.cols[k], 0);
+ } else {
+ w = (Writable) valueWriters[p].writeValue(batch.cols[k], index);
+ }
+ ost.setFieldValue(k, w);
}
- ost.setFieldValue(k, w);
+ OrcSerdeRow row = (OrcSerdeRow) orcRowArray[i];
+ row.realRow = ost;
+ row.inspector = inspector;
}
- OrcSerdeRow row = (OrcSerdeRow) orcRowArray[i];
- row.realRow = ost;
- row.inspector = inspector;
+ } catch (HiveException ex) {
+ throw new RuntimeException(ex);
}
ow.set(orcRowArray);
return ow;