You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/09/29 20:38:46 UTC
svn commit: r1628251 - in /hive/branches/branch-0.14/ql/src:
java/org/apache/hadoop/hive/ql/exec/vector/
java/org/apache/hadoop/hive/ql/optimizer/physical/
test/results/clientpositive/tez/
Author: prasanthj
Date: Mon Sep 29 18:38:46 2014
New Revision: 1628251
URL: http://svn.apache.org/r1628251
Log:
HIVE-8226: Vectorize dynamic partitioning in VectorFileSinkOperator (Matt McCline reviewed by Prasanth J)
Modified:
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
hive/branches/branch-0.14/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java?rev=1628251&r1=1628250&r2=1628251&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractOperator.java Mon Sep 29 18:38:46 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.vector;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -45,7 +46,8 @@ public class VectorExtractOperator exten
private int keyColCount;
private int valueColCount;
- private transient int [] projectedColumns = null;
+ private transient VectorizedRowBatch outputBatch;
+ private transient int remainingColCount;
public VectorExtractOperator(VectorizationContext vContext, OperatorDesc conf)
throws HiveException {
@@ -57,26 +59,25 @@ public class VectorExtractOperator exten
super();
}
- private StructObjectInspector makeStandardStructObjectInspector(StructObjectInspector structObjectInspector) {
- List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ StructObjectInspector structInputObjInspector = (StructObjectInspector) inputObjInspectors[0];
+ List<? extends StructField> fields = structInputObjInspector.getAllStructFieldRefs();
ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
ArrayList<String> colNames = new ArrayList<String>();
- for (StructField field: fields) {
- colNames.add(field.getFieldName());
+ for (int i = keyColCount; i < fields.size(); i++) {
+ StructField field = fields.get(i);
+ String fieldName = field.getFieldName();
+
+ // Remove "VALUE." prefix.
+ int dotIndex = fieldName.indexOf(".");
+ colNames.add(fieldName.substring(dotIndex + 1));
ois.add(field.getFieldObjectInspector());
}
- return ObjectInspectorFactory
+ outputObjInspector = ObjectInspectorFactory
.getStandardStructObjectInspector(colNames, ois);
- }
-
- @Override
- protected void initializeOp(Configuration hconf) throws HiveException {
- outputObjInspector = inputObjInspectors[0];
- LOG.info("VectorExtractOperator class of outputObjInspector is " + outputObjInspector.getClass().getName());
- projectedColumns = new int [valueColCount];
- for (int i = 0; i < valueColCount; i++) {
- projectedColumns[i] = keyColCount + i;
- }
+ remainingColCount = fields.size() - keyColCount;
+ outputBatch = new VectorizedRowBatch(remainingColCount);
initializeChildren(hconf);
}
@@ -86,20 +87,16 @@ public class VectorExtractOperator exten
}
@Override
- // Evaluate vectorized batches of rows and forward them.
+ // Remove the key columns and forward the values (and scratch columns).
public void processOp(Object row, int tag) throws HiveException {
- VectorizedRowBatch vrg = (VectorizedRowBatch) row;
+ VectorizedRowBatch inputBatch = (VectorizedRowBatch) row;
+
+ // Copy references to the input columns array starting after the keys...
+ for (int i = 0; i < remainingColCount; i++) {
+ outputBatch.cols[i] = inputBatch.cols[keyColCount + i];
+ }
+ outputBatch.size = inputBatch.size;
- // Project away the key columns...
- int[] originalProjections = vrg.projectedColumns;
- int originalProjectionSize = vrg.projectionSize;
- vrg.projectionSize = valueColCount;
- vrg.projectedColumns = this.projectedColumns;
-
- forward(vrg, outputObjInspector);
-
- // Revert the projected columns back, because vrg will be re-used.
- vrg.projectionSize = originalProjectionSize;
- vrg.projectedColumns = originalProjections;
+ forward(outputBatch, outputObjInspector);
}
}
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1628251&r1=1628250&r2=1628251&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Mon Sep 29 18:38:46 2014
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.ql.exec.vector;
-import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -27,16 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
/**
* File Sink operator implementation.
@@ -69,113 +58,10 @@ public class VectorFileSinkOperator exte
@Override
public void processOp(Object data, int tag) throws HiveException {
-
VectorizedRowBatch vrg = (VectorizedRowBatch)data;
-
- Writable [] records = null;
- boolean vectorizedSerde = false;
- try {
- if (serializer instanceof VectorizedSerde) {
- recordValue = ((VectorizedSerde) serializer).serializeVector(vrg,
- inputObjInspectors[0]);
- records = (Writable[]) ((ObjectWritable) recordValue).get();
- vectorizedSerde = true;
- }
- } catch (SerDeException e1) {
- throw new HiveException(e1);
- }
-
for (int i = 0; i < vrg.size; i++) {
- Writable row = null;
- if (vectorizedSerde) {
- row = records[i];
- } else {
- if (vrg.valueWriters == null) {
- vrg.setValueWriters(this.valueWriters);
- }
- try {
- row = serializer.serialize(getRowObject(vrg, i), inputObjInspectors[0]);
- } catch (SerDeException ex) {
- throw new HiveException(ex);
- }
- }
- /* Create list bucketing sub-directory only if stored-as-directories is on. */
- String lbDirName = null;
- lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
-
- FSPaths fpaths;
-
- if (!bDynParts && !filesCreated) {
- if (lbDirName != null) {
- FSPaths fsp2 = lookupListBucketingPaths(lbDirName);
- } else {
- createBucketFiles(fsp);
- }
- }
-
- try {
- updateProgress();
-
- // if DP is enabled, get the final output writers and prepare the real output row
- assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct";
-
- if (bDynParts) {
- // copy the DP column values from the input row to dpVals
- dpVals.clear();
- dpWritables.clear();
- ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol, numDynParts,
- (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
- // get a set of RecordWriter based on the DP column values
- // pass the null value along to the escaping process to determine what the dir should be
- for (Object o : dpWritables) {
- if (o == null || o.toString().length() == 0) {
- dpVals.add(dpCtx.getDefaultPartitionName());
- } else {
- dpVals.add(o.toString());
- }
- }
- fpaths = getDynOutPaths(dpVals, lbDirName);
-
- } else {
- if (lbDirName != null) {
- fpaths = lookupListBucketingPaths(lbDirName);
- } else {
- fpaths = fsp;
- }
- }
-
- rowOutWriters = fpaths.getOutWriters();
- // check if all record writers implement statistics. if atleast one RW
- // doesn't implement stats interface we will fallback to conventional way
- // of gathering stats
- isCollectRWStats = areAllTrue(statsFromRecordWriter);
- if (conf.isGatherStats() && !isCollectRWStats) {
- if (statsCollectRawDataSize) {
- SerDeStats stats = serializer.getSerDeStats();
- if (stats != null) {
- fpaths.getStat().addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
- }
- }
- fpaths.getStat().addToStat(StatsSetupConst.ROW_COUNT, 1);
- }
-
-
- if (row_count != null) {
- row_count.set(row_count.get() + 1);
- }
-
- if (!multiFileSpray) {
- rowOutWriters[0].write(row);
- } else {
- int keyHashCode = 0;
- key.setHashCode(keyHashCode);
- int bucketNum = prtner.getBucket(key, null, totalFiles);
- int idx = bucketMap.get(bucketNum);
- rowOutWriters[idx].write(row);
- }
- } catch (IOException e) {
- throw new HiveException(e);
- }
+ Object[] row = getRowObject(vrg, i);
+ super.processOp(row, tag);
}
}
@@ -187,7 +73,7 @@ public class VectorFileSinkOperator exte
}
for (int i = 0; i < vrg.projectionSize; i++) {
ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]];
- singleRow[i] = vrg.valueWriters[i].writeValue(vectorColumn, batchIndex);
+ singleRow[i] = valueWriters[i].writeValue(vectorColumn, batchIndex);
}
return singleRow;
}
Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1628251&r1=1628250&r2=1628251&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Mon Sep 29 18:38:46 2014
@@ -1062,11 +1062,6 @@ public class Vectorizer implements Physi
}
private boolean validateFileSinkOperator(FileSinkOperator op) {
- // HIVE-7557: For now, turn off dynamic partitioning to give more time to
- // figure out how to make VectorFileSink work correctly with it...
- if (op.getConf().getDynPartCtx() != null) {
- return false;
- }
return true;
}
Modified: hive/branches/branch-0.14/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out?rev=1628251&r1=1628250&r2=1628251&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out (original)
+++ hive/branches/branch-0.14/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out Mon Sep 29 18:38:46 2014
@@ -214,6 +214,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
name: default.over1k_part_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -300,6 +301,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
name: default.over1k_part_limit_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -368,6 +370,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
name: default.over1k_part_buck_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -435,6 +438,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
name: default.over1k_part_buck_sort_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -585,6 +589,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
name: default.over1k_part_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -671,6 +676,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
name: default.over1k_part_limit_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -739,6 +745,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
name: default.over1k_part_buck_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -806,6 +813,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
name: default.over1k_part_buck_sort_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -1362,6 +1370,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.over1k_part2_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -1443,6 +1452,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.over1k_part2_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -1531,6 +1541,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.over1k_part2_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -1610,6 +1621,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.over1k_part2_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -1703,6 +1715,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.over1k_part2_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -2097,6 +2110,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.over1k_part_buck_sort2_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@ -2164,6 +2178,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.over1k_part_buck_sort2_orc
+ Execution mode: vectorized
Stage: Stage-2
Dependency Collection