You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/03 03:38:04 UTC
[47/50] [abbrv] hive git commit: HIVE-12878: Support Vectorization
for TEXTFILE and other formats (Matt McCline, reviewed by Sergey Shelukhin)
HIVE-12878: Support Vectorization for TEXTFILE and other formats (Matt McCline, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d5285d8e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d5285d8e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d5285d8e
Branch: refs/heads/llap
Commit: d5285d8ebaf5bef2a13b2c2338be2fe683804b02
Parents: 2f0339b
Author: Matt McCline <mm...@hortonworks.com>
Authored: Mon May 2 16:58:53 2016 -0700
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Mon May 2 16:58:53 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 9 +
data/files/struct1_a.txt | 4 +
data/files/struct1_b.txt | 1 +
data/files/struct1_c.txt | 1 +
data/files/struct2_a.txt | 2 +
data/files/struct2_b.txt | 2 +
data/files/struct2_c.txt | 1 +
data/files/struct2_d.txt | 1 +
data/files/struct3_a.txt | 2 +
data/files/struct3_b.txt | 1 +
data/files/struct3_c.txt | 1 +
data/files/struct4_a.txt | 2 +
data/files/struct4_b.txt | 1 +
data/files/struct4_c.txt | 1 +
.../test/resources/testconfiguration.properties | 28 +-
.../hive/llap/io/api/impl/LlapInputFormat.java | 2 +-
.../hive/ql/exec/AbstractMapOperator.java | 178 +
.../apache/hadoop/hive/ql/exec/MapOperator.java | 86 +-
.../apache/hadoop/hive/ql/exec/Utilities.java | 33 +-
.../hadoop/hive/ql/exec/mr/ExecMapper.java | 3 +-
.../ql/exec/spark/SparkMapRecordHandler.java | 3 +-
.../hadoop/hive/ql/exec/tez/DagUtils.java | 3 +-
.../hive/ql/exec/tez/MapRecordProcessor.java | 15 +-
.../hive/ql/exec/tez/MapRecordSource.java | 6 +-
.../hive/ql/exec/tez/ReduceRecordSource.java | 4 +-
.../vector/VectorAppMasterEventOperator.java | 16 +-
.../hive/ql/exec/vector/VectorAssignRow.java | 1111 ++--
.../ql/exec/vector/VectorAssignRowDynBatch.java | 41 -
.../exec/vector/VectorAssignRowSameBatch.java | 36 -
.../ql/exec/vector/VectorDeserializeRow.java | 1114 ++--
.../hive/ql/exec/vector/VectorExtractRow.java | 971 +---
.../exec/vector/VectorExtractRowDynBatch.java | 40 -
.../exec/vector/VectorExtractRowSameBatch.java | 36 -
.../ql/exec/vector/VectorFileSinkOperator.java | 16 +-
.../ql/exec/vector/VectorGroupByOperator.java | 13 +-
.../exec/vector/VectorMapJoinBaseOperator.java | 11 +-
.../ql/exec/vector/VectorMapJoinOperator.java | 4 +-
.../VectorMapJoinOuterFilteredOperator.java | 17 +-
.../hive/ql/exec/vector/VectorMapOperator.java | 848 ++-
.../exec/vector/VectorReduceSinkOperator.java | 16 +-
.../exec/vector/VectorSMBMapJoinOperator.java | 11 +-
.../VectorSparkHashTableSinkOperator.java | 16 +-
...VectorSparkPartitionPruningSinkOperator.java | 13 +-
.../ql/exec/vector/VectorizationContext.java | 12 +-
.../ql/exec/vector/VectorizedBatchUtil.java | 49 +
.../VectorMapJoinGenerateResultOperator.java | 8 +-
.../fast/VectorMapJoinFastLongHashUtil.java | 10 +-
.../fast/VectorMapJoinFastStringCommon.java | 10 +-
.../hadoop/hive/ql/io/HiveInputFormat.java | 6 +-
.../hadoop/hive/ql/io/NullRowsInputFormat.java | 2 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 4 +-
.../ql/io/parquet/MapredParquetInputFormat.java | 2 +-
.../hive/ql/optimizer/physical/Vectorizer.java | 334 +-
.../apache/hadoop/hive/ql/plan/BaseWork.java | 23 +
.../org/apache/hadoop/hive/ql/plan/MapWork.java | 11 +
.../hive/ql/plan/VectorPartitionConversion.java | 172 +-
.../hive/ql/plan/VectorPartitionDesc.java | 164 +-
.../ql/exec/vector/TestVectorRowObject.java | 14 +-
.../hive/ql/exec/vector/TestVectorSerDeRow.java | 169 +-
.../hive/ql/io/orc/TestInputOutputFormat.java | 10 +-
.../avro_schema_evolution_native.q | 18 +
.../queries/clientpositive/bucket_groupby.q | 33 +-
.../queries/clientpositive/groupby_sort_10.q | 2 +
.../schema_evol_orc_acidvec_mapwork_part.q | 3 +
.../schema_evol_orc_acidvec_mapwork_table.q | 3 +
.../schema_evol_orc_nonvec_mapwork_table.q | 2 -
.../schema_evol_orc_vec_mapwork_part.q | 3 +
.../schema_evol_orc_vec_mapwork_table.q | 7 +-
.../schema_evol_text_fetchwork_table.q | 56 -
.../schema_evol_text_mapwork_table.q | 56 -
.../schema_evol_text_nonvec_fetchwork_part.q | 98 -
.../schema_evol_text_nonvec_fetchwork_table.q | 67 -
.../schema_evol_text_nonvec_mapwork_part.q | 828 ++-
..._evol_text_nonvec_mapwork_part_all_complex.q | 159 +
...vol_text_nonvec_mapwork_part_all_primitive.q | 509 ++
.../schema_evol_text_nonvec_mapwork_table.q | 822 ++-
.../schema_evol_text_vec_mapwork_part.q | 827 +++
...ema_evol_text_vec_mapwork_part_all_complex.q | 164 +
...a_evol_text_vec_mapwork_part_all_primitive.q | 514 ++
.../schema_evol_text_vec_mapwork_table.q | 826 +++
.../schema_evol_text_vecrow_mapwork_part.q | 827 +++
..._evol_text_vecrow_mapwork_part_all_complex.q | 165 +
...vol_text_vecrow_mapwork_part_all_primitive.q | 514 ++
.../schema_evol_text_vecrow_mapwork_table.q | 826 +++
.../clientpositive/tez_schema_evolution.q | 1 +
.../avro_schema_evolution_native.q.out | 206 +
.../results/clientpositive/bucket_groupby.q.out | 308 +-
.../clientpositive/groupby_sort_10.q.out | 8 +-
.../schema_evol_text_fetchwork_table.q.out | 298 --
.../schema_evol_text_mapwork_table.q.out | 298 --
...schema_evol_text_nonvec_fetchwork_part.q.out | 642 ---
...chema_evol_text_nonvec_fetchwork_table.q.out | 297 --
.../schema_evol_text_nonvec_mapwork_part.q.out | 4909 ++++++++++++++++--
...l_text_nonvec_mapwork_part_all_complex.q.out | 726 +++
...text_nonvec_mapwork_part_all_primitive.q.out | 3038 +++++++++++
.../schema_evol_text_nonvec_mapwork_table.q.out | 4376 +++++++++++++++-
.../schema_evol_text_vec_mapwork_part.q.out | 4479 ++++++++++++++++
...evol_text_vec_mapwork_part_all_complex.q.out | 730 +++
...ol_text_vec_mapwork_part_all_primitive.q.out | 3058 +++++++++++
.../schema_evol_text_vec_mapwork_table.q.out | 4221 +++++++++++++++
.../schema_evol_text_vecrow_mapwork_part.q.out | 4479 ++++++++++++++++
...l_text_vecrow_mapwork_part_all_complex.q.out | 732 +++
...text_vecrow_mapwork_part_all_primitive.q.out | 3058 +++++++++++
.../schema_evol_text_vecrow_mapwork_table.q.out | 4221 +++++++++++++++
.../tez/schema_evol_text_fetchwork_table.q.out | 298 --
.../tez/schema_evol_text_mapwork_table.q.out | 298 --
...schema_evol_text_nonvec_fetchwork_part.q.out | 642 ---
...chema_evol_text_nonvec_fetchwork_table.q.out | 297 --
.../schema_evol_text_nonvec_mapwork_part.q.out | 4453 ++++++++++++++--
...l_text_nonvec_mapwork_part_all_complex.q.out | 669 +++
...text_nonvec_mapwork_part_all_primitive.q.out | 2734 ++++++++++
.../schema_evol_text_nonvec_mapwork_table.q.out | 3920 +++++++++++++-
.../tez/schema_evol_text_vec_mapwork_part.q.out | 3999 ++++++++++++++
...evol_text_vec_mapwork_part_all_complex.q.out | 673 +++
...ol_text_vec_mapwork_part_all_primitive.q.out | 2738 ++++++++++
.../schema_evol_text_vec_mapwork_table.q.out | 3741 +++++++++++++
.../schema_evol_text_vecrow_mapwork_part.q.out | 3999 ++++++++++++++
...l_text_vecrow_mapwork_part_all_complex.q.out | 675 +++
...text_vecrow_mapwork_part_all_primitive.q.out | 2738 ++++++++++
.../schema_evol_text_vecrow_mapwork_table.q.out | 3741 +++++++++++++
.../vector_orc_string_reader_empty_dict.q.out | 62 +
.../tez/vector_partition_diff_num_cols.q.out | 1 +
.../tez/vector_tablesample_rows.q.out | 307 ++
.../vector_partition_diff_num_cols.q.out | 1 +
.../vector_tablesample_rows.q.out | 2 -
.../fast/BinarySortableDeserializeRead.java | 806 +--
.../hive/serde2/fast/DeserializeRead.java | 379 +-
.../lazy/fast/LazySimpleDeserializeRead.java | 704 +--
.../fast/LazyBinaryDeserializeRead.java | 944 +---
.../apache/hadoop/hive/serde2/VerifyFast.java | 75 +-
130 files changed, 82172 insertions(+), 9858 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2814353..caadf2a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2481,6 +2481,15 @@ public class HiveConf extends Configuration {
"This flag should be set to true to enable the new vectorization\n" +
"of queries using ReduceSink.\ni" +
"The default value is true."),
+ HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT("hive.vectorized.use.vectorized.input.format", true,
+ "This flag should be set to true to enable vectorizing with vectorized input file format capable SerDe.\n" +
+ "The default value is true."),
+ HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE("hive.vectorized.use.vector.serde.deserialize", false,
+ "This flag should be set to true to enable vectorizing rows using vector deserialize.\n" +
+ "The default value is false."),
+ HIVE_VECTORIZATION_USE_ROW_DESERIALIZE("hive.vectorized.use.row.serde.deserialize", false,
+ "This flag should be set to true to enable vectorizing using row deserialize.\n" +
+ "The default value is false."),
HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true, "This property has been extended to control "
+ "whether to check, convert, and normalize partition value to conform to its column type in "
+ "partition operations including but not limited to insert, such as alter, describe etc."),
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct1_a.txt
----------------------------------------------------------------------
diff --git a/data/files/struct1_a.txt b/data/files/struct1_a.txt
new file mode 100644
index 0000000..b36846e
--- /dev/null
+++ b/data/files/struct1_a.txt
@@ -0,0 +1,4 @@
+1|true,200,72909,3244222,-99999999999,-29.0764,470614135,470614135,dynamic reptile ,dynamic reptile ,0004-09-22 18:26:29.519542222,2007-02-09,binary|original
+2|0,100,483777,14,-23866739993,-3651.672121,46114.284799488,46114.284799488, baffling , baffling ,2007-02-09 05:17:29.368756876,0004-09-22,binary|original
+3|false,72,3244222,-93222,30.774,-66475.561431,-66475.561431,0.561431,1,1,6229-06-28 02:54:28.970117179,5966-07-09,binary|original
+4|1,-90,754072151,3289094,46114.284799488,9250340.75,9250340.75,9250340.75,junkyard,junkyard,2002-05-10 05:29:48.990818073,1815-05-06,binary|original
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct1_b.txt
----------------------------------------------------------------------
diff --git a/data/files/struct1_b.txt b/data/files/struct1_b.txt
new file mode 100644
index 0000000..1887c68
--- /dev/null
+++ b/data/files/struct1_b.txt
@@ -0,0 +1 @@
+5|true,400,44388,-100,953967041.,62.079153,718.78,1,verdict,verdict,timestamp,date,binary|new
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct1_c.txt
----------------------------------------------------------------------
diff --git a/data/files/struct1_c.txt b/data/files/struct1_c.txt
new file mode 100644
index 0000000..5d482c8
--- /dev/null
+++ b/data/files/struct1_c.txt
@@ -0,0 +1 @@
+6|false,-67,833,63993,1255178165.77663,905070.974,-4314.7918,-1240033819,trial,trial,2016-03-0703:02:22.0,2016-03-07,binary|new
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct2_a.txt
----------------------------------------------------------------------
diff --git a/data/files/struct2_a.txt b/data/files/struct2_a.txt
new file mode 100644
index 0000000..7fdfef1
--- /dev/null
+++ b/data/files/struct2_a.txt
@@ -0,0 +1,2 @@
+3|new|true,200,72909,3244222,-99999999999,-29.0764,470614135,470614135,dynamic reptile ,dynamic reptile ,0004-09-22 18:26:29.519542222,2007-02-09,binary
+4|new|0,100,483777,14,-23866739993,-3651.672121,46114.284799488,46114.284799488, baffling , baffling ,2007-02-09 05:17:29.368756876,0004-09-22,binary
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct2_b.txt
----------------------------------------------------------------------
diff --git a/data/files/struct2_b.txt b/data/files/struct2_b.txt
new file mode 100644
index 0000000..a814af3
--- /dev/null
+++ b/data/files/struct2_b.txt
@@ -0,0 +1,2 @@
+5|new|false,72,3244222,-93222,30.774,-66475.561431,-66475.561431,0.561431,1,1,6229-06-28 02:54:28.970117179,5966-07-09,binary
+6|new|1,-90,754072151,3289094,46114.284799488,9250340.75,9250340.75,9250340.75,junkyard,junkyard,2002-05-10 05:29:48.990818073,1815-05-06,binary
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct2_c.txt
----------------------------------------------------------------------
diff --git a/data/files/struct2_c.txt b/data/files/struct2_c.txt
new file mode 100644
index 0000000..2c9c1bb
--- /dev/null
+++ b/data/files/struct2_c.txt
@@ -0,0 +1 @@
+7|new|true,400,44388,-100,953967041.,62.079153,718.78,1,verdict,verdict,timestamp,date,binary
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct2_d.txt
----------------------------------------------------------------------
diff --git a/data/files/struct2_d.txt b/data/files/struct2_d.txt
new file mode 100644
index 0000000..3c7801e
--- /dev/null
+++ b/data/files/struct2_d.txt
@@ -0,0 +1 @@
+8|new|false,-67,833,63993,1255178165.77663,905070.974,-4314.7918,-1240033819,trial,trial,2016-03-0703:02:22.0,2016-03-07,binary
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct3_a.txt
----------------------------------------------------------------------
diff --git a/data/files/struct3_a.txt b/data/files/struct3_a.txt
new file mode 100644
index 0000000..19dbd7f
--- /dev/null
+++ b/data/files/struct3_a.txt
@@ -0,0 +1,2 @@
+1|true,200,72909,3244222,-99999999999|original
+2|0,100,483777,14,-23866739993|original
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct3_b.txt
----------------------------------------------------------------------
diff --git a/data/files/struct3_b.txt b/data/files/struct3_b.txt
new file mode 100644
index 0000000..030e0c0
--- /dev/null
+++ b/data/files/struct3_b.txt
@@ -0,0 +1 @@
+3|true,400,44388,-100,953967041.,62.079153,718.78,1,verdict,verdict,timestamp,date,binary|new
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct3_c.txt
----------------------------------------------------------------------
diff --git a/data/files/struct3_c.txt b/data/files/struct3_c.txt
new file mode 100644
index 0000000..236694b
--- /dev/null
+++ b/data/files/struct3_c.txt
@@ -0,0 +1 @@
+4|false,-67,833,63993,1255178165.77663,905070.974,-4314.7918,-1240033819,trial,trial,2016-03-0703:02:22.0,2016-03-07,binary|new
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct4_a.txt
----------------------------------------------------------------------
diff --git a/data/files/struct4_a.txt b/data/files/struct4_a.txt
new file mode 100644
index 0000000..ecf832f
--- /dev/null
+++ b/data/files/struct4_a.txt
@@ -0,0 +1,2 @@
+1|original|true,200,72909,3244222,-99999999999
+2|original|0,100,483777,14,-23866739993
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct4_b.txt
----------------------------------------------------------------------
diff --git a/data/files/struct4_b.txt b/data/files/struct4_b.txt
new file mode 100644
index 0000000..701253c
--- /dev/null
+++ b/data/files/struct4_b.txt
@@ -0,0 +1 @@
+3|new|true,400,44388,-100,953967041.,62.079153,718.78,1,verdict,verdict,timestamp,date,binary
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct4_c.txt
----------------------------------------------------------------------
diff --git a/data/files/struct4_c.txt b/data/files/struct4_c.txt
new file mode 100644
index 0000000..c56e002
--- /dev/null
+++ b/data/files/struct4_c.txt
@@ -0,0 +1 @@
+4|new|false,-67,833,63993,1255178165.77663,905070.974,-4314.7918,-1240033819,trial,trial,2016-03-0703:02:22.0,2016-03-07,binary
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 0ef3161..346a38d 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -186,22 +186,28 @@ minitez.query.files.shared=acid_globallimit.q,\
ptf_streaming.q,\
sample1.q,\
schema_evol_stats.q,\
- schema_evol_text_nonvec_mapwork_table.q,\
- schema_evol_text_nonvec_fetchwork_table.q,\
- schema_evol_orc_nonvec_fetchwork_part.q,\
- schema_evol_orc_nonvec_mapwork_part.q,\
- schema_evol_text_nonvec_fetchwork_part.q,\
- schema_evol_text_nonvec_mapwork_part.q,\
schema_evol_orc_acid_mapwork_part.q,\
schema_evol_orc_acid_mapwork_table.q,\
- schema_evol_orc_acidvec_mapwork_table.q,\
schema_evol_orc_acidvec_mapwork_part.q,\
+ schema_evol_orc_acidvec_mapwork_table.q,\
+ schema_evol_orc_nonvec_fetchwork_part.q,\
+ schema_evol_orc_nonvec_fetchwork_table.q,\
+ schema_evol_orc_nonvec_mapwork_part.q,\
+ schema_evol_orc_nonvec_mapwork_table.q,\
schema_evol_orc_vec_mapwork_part.q,\
- schema_evol_text_fetchwork_table.q,\
- schema_evol_text_mapwork_table.q,\
schema_evol_orc_vec_mapwork_table.q,\
- schema_evol_orc_nonvec_mapwork_table.q,\
- schema_evol_orc_nonvec_fetchwork_table.q,\
+ schema_evol_text_nonvec_mapwork_part.q,\
+ schema_evol_text_nonvec_mapwork_part_all_complex.q,\
+ schema_evol_text_nonvec_mapwork_part_all_primitive.q,\
+ schema_evol_text_nonvec_mapwork_table.q,\
+ schema_evol_text_vec_mapwork_part.q,\
+ schema_evol_text_vec_mapwork_part_all_complex.q,\
+ schema_evol_text_vec_mapwork_part_all_primitive.q,\
+ schema_evol_text_vec_mapwork_table.q,\
+ schema_evol_text_vecrow_mapwork_part.q,\
+ schema_evol_text_vecrow_mapwork_part_all_complex.q,\
+ schema_evol_text_vecrow_mapwork_part_all_primitive.q,\
+ schema_evol_text_vecrow_mapwork_table.q,\
selectDistinctStar.q,\
script_env_var1.q,\
script_env_var2.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 9fb79a5..298f788 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -102,7 +102,7 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
sourceInputFormat.getRecordReader(split, job, reporter);
return rr;
}
- boolean isVectorMode = Utilities.isVectorMode(job);
+ boolean isVectorMode = Utilities.getUseVectorizedInputFileFormat(job);
if (!isVectorMode) {
LlapIoImpl.LOG.error("No LLAP IO in non-vectorized mode");
throw new UnsupportedOperationException("No LLAP IO in non-vectorized mode");
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java
new file mode 100644
index 0000000..5c3012b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * Abstract Map operator. Common code of MapOperator and VectorMapOperator.
+ **/
+@SuppressWarnings("deprecation")
+public abstract class AbstractMapOperator extends Operator<MapWork>
+ implements Serializable, Cloneable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Initialization call sequence:
+ *
+ * (Operator) Operator.setConf(MapWork conf);
+ * (Operator) Operator.initialize(
+ * Configuration hconf, ObjectInspector[] inputOIs);
+ *
+ * ([Vector]MapOperator) @Override setChildren(Configuration hconf)
+ *
+ * (Operator) Operator.passExecContext(ExecMapperContext execContext)
+ * (Operator) Operator.initializeLocalWork(Configuration hconf)
+ *
+ * (AbstractMapOperator) initializeMapOperator(Configuration hconf)
+ *
+ * [ (AbstractMapOperator) initializeContexts() ] // exec.tez.MapRecordProcessor only.
+ *
+ * (Operator) Operator.setReporter(Reporter rep)
+ *
+ */
+
+ /**
+ * Counter.
+ *
+ */
+ public static enum Counter {
+ DESERIALIZE_ERRORS,
+ RECORDS_IN
+ }
+
+ protected final transient LongWritable deserialize_error_count = new LongWritable();
+ protected final transient LongWritable recordCounter = new LongWritable();
+ protected transient long numRows = 0;
+
+ private final Map<Integer, DummyStoreOperator> connectedOperators
+ = new TreeMap<Integer, DummyStoreOperator>();
+
+ private transient final Map<String, Path> normalizedPaths = new HashMap<String, Path>();
+
+ private Path normalizePath(String onefile, boolean schemaless) {
+ //creating Path is expensive, so cache the corresponding
+ //Path object in normalizedPaths
+ Path path = normalizedPaths.get(onefile);
+ if (path == null) {
+ path = new Path(onefile);
+ if (schemaless && path.toUri().getScheme() != null) {
+ path = new Path(path.toUri().getPath());
+ }
+ normalizedPaths.put(onefile, path);
+ }
+ return path;
+ }
+
+ protected String getNominalPath(Path fpath) {
+ String nominal = null;
+ boolean schemaless = fpath.toUri().getScheme() == null;
+ for (String onefile : conf.getPathToAliases().keySet()) {
+ Path onepath = normalizePath(onefile, schemaless);
+ Path curfpath = fpath;
+ if(!schemaless && onepath.toUri().getScheme() == null) {
+ curfpath = new Path(fpath.toUri().getPath());
+ }
+ // check for the operators who will process rows coming to this Map Operator
+ if (onepath.toUri().relativize(curfpath.toUri()).equals(curfpath.toUri())) {
+ // not from this
+ continue;
+ }
+ if (nominal != null) {
+ throw new IllegalStateException("Ambiguous input path " + fpath);
+ }
+ nominal = onefile;
+ }
+ if (nominal == null) {
+ throw new IllegalStateException("Invalid input path " + fpath);
+ }
+ return nominal;
+ }
+
+ public abstract void initEmptyInputChildren(List<Operator<?>> children, Configuration hconf)
+ throws SerDeException, Exception;
+
+
+ /** Kryo ctor. */
+ protected AbstractMapOperator() {
+ super();
+ }
+
+ public AbstractMapOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
+ public abstract void setChildren(Configuration hconf) throws Exception;
+
+
+ public void initializeMapOperator(Configuration hconf) throws HiveException {
+ // set that parent initialization is done and call initialize on children
+ state = State.INIT;
+
+ statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
+
+ numRows = 0;
+
+ String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+ if (context != null && !context.isEmpty()) {
+ context = "_" + context.replace(" ","_");
+ }
+ statsMap.put(Counter.RECORDS_IN + context, recordCounter);
+ }
+
+ public abstract void initializeContexts() throws HiveException;
+
+ public abstract Deserializer getCurrentDeserializer();
+
+ public abstract void process(Writable value) throws HiveException;
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ recordCounter.set(numRows);
+ super.closeOp(abort);
+ }
+
+ public void clearConnectedOperators() {
+ connectedOperators.clear();
+ }
+
+ public void setConnectedOperators(int tag, DummyStoreOperator dummyOp) {
+ connectedOperators.put(tag, dummyOp);
+ }
+
+ public Map<Integer, DummyStoreOperator> getConnectedOperators() {
+ return connectedOperators;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index b1f9958..afe5ee2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -72,25 +72,11 @@ import com.google.common.annotations.VisibleForTesting;
* Writable data structure from a Table (instead of a Hive Object).
**/
@SuppressWarnings("deprecation")
-public class MapOperator extends Operator<MapWork> implements Serializable, Cloneable {
+public class MapOperator extends AbstractMapOperator {
private static final long serialVersionUID = 1L;
- /**
- * Counter.
- *
- */
- public static enum Counter {
- DESERIALIZE_ERRORS,
- RECORDS_IN
- }
-
- private final transient LongWritable deserialize_error_count = new LongWritable();
- private final transient LongWritable recordCounter = new LongWritable();
- protected transient long numRows = 0;
protected transient long cntr = 1;
- private final Map<Integer, DummyStoreOperator> connectedOperators
- = new TreeMap<Integer, DummyStoreOperator>();
protected transient long logEveryNRows = 0;
// input path --> {operator --> context}
@@ -102,7 +88,6 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
// context for current input file
protected transient MapOpCtx[] currentCtxs;
- private transient final Map<String, Path> normalizedPaths = new HashMap<String, Path>();
protected static class MapOpCtx {
@@ -433,31 +418,6 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
}
}
- private String getNominalPath(Path fpath) {
- String nominal = null;
- boolean schemaless = fpath.toUri().getScheme() == null;
- for (String onefile : conf.getPathToAliases().keySet()) {
- Path onepath = normalizePath(onefile, schemaless);
- Path curfpath = fpath;
- if(!schemaless && onepath.toUri().getScheme() == null) {
- curfpath = new Path(fpath.toUri().getPath());
- }
- // check for the operators who will process rows coming to this Map Operator
- if (onepath.toUri().relativize(curfpath.toUri()).equals(curfpath.toUri())) {
- // not from this
- continue;
- }
- if (nominal != null) {
- throw new IllegalStateException("Ambiguous input path " + fpath);
- }
- nominal = onefile;
- }
- if (nominal == null) {
- throw new IllegalStateException("Invalid input path " + fpath);
- }
- return nominal;
- }
-
/** Kryo ctor. */
protected MapOperator() {
super();
@@ -473,32 +433,17 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
}
public void initializeMapOperator(Configuration hconf) throws HiveException {
- // set that parent initialization is done and call initialize on children
- state = State.INIT;
- statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
+ super.initializeMapOperator(hconf);
- numRows = 0;
cntr = 1;
logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS);
- String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
- if (context != null && !context.isEmpty()) {
- context = "_" + context.replace(" ","_");
- }
- statsMap.put(Counter.RECORDS_IN + context, recordCounter);
-
for (Entry<Operator<?>, StructObjectInspector> entry : childrenOpToOI.entrySet()) {
Operator<?> child = entry.getKey();
child.initialize(hconf, new ObjectInspector[] {entry.getValue()});
}
}
- @Override
- public void closeOp(boolean abort) throws HiveException {
- recordCounter.set(numRows);
- super.closeOp(abort);
- }
-
// Find context for current input file
@Override
public void cleanUpInputFileChangedOp() throws HiveException {
@@ -528,20 +473,6 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]);
}
- private Path normalizePath(String onefile, boolean schemaless) {
- //creating Path is expensive, so cache the corresponding
- //Path object in normalizedPaths
- Path path = normalizedPaths.get(onefile);
- if (path == null) {
- path = new Path(onefile);
- if (schemaless && path.toUri().getScheme() != null) {
- path = new Path(path.toUri().getPath());
- }
- normalizedPaths.put(onefile, path);
- }
- return path;
- }
-
public void process(Writable value) throws HiveException {
// A mapper can span multiple files/partitions.
// The serializers need to be reset if the input file changed
@@ -698,17 +629,4 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
return currentCtxs[0].deserializer;
}
-
- public void clearConnectedOperators() {
- connectedOperators.clear();
- }
-
- public void setConnectedOperators(int tag, DummyStoreOperator dummyOp) {
- connectedOperators.put(tag, dummyOp);
- }
-
- public Map<Integer, DummyStoreOperator> getConnectedOperators() {
- return connectedOperators;
- }
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index ab0635e..449bef8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -216,6 +216,7 @@ public final class Utilities {
public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
public static final String HIVE_ADDED_JARS = "hive.added.jars";
public static final String VECTOR_MODE = "VECTOR_MODE";
+ public static final String USE_VECTORIZED_INPUT_FILE_FORMAT = "USE_VECTORIZED_INPUT_FILE_FORMAT";
public static String MAPNAME = "Map ";
public static String REDUCENAME = "Reducer ";
@@ -3254,24 +3255,39 @@ public final class Utilities {
/**
* Returns true if a plan is both configured for vectorized execution
- * and vectorization is allowed. The plan may be configured for vectorization
+ * and the node is vectorized and the Input File Format is marked VectorizedInputFileFormat.
+ *
+ * The plan may be configured for vectorization
* but vectorization disallowed eg. for FetchOperator execution.
*/
- public static boolean isVectorMode(Configuration conf) {
+ public static boolean getUseVectorizedInputFileFormat(Configuration conf) {
if (conf.get(VECTOR_MODE) != null) {
// this code path is necessary, because with HS2 and client
// side split generation we end up not finding the map work.
// This is because of thread local madness (tez split
// generation is multi-threaded - HS2 plan cache uses thread
// locals).
- return conf.getBoolean(VECTOR_MODE, false);
+ return
+ conf.getBoolean(VECTOR_MODE, false) &&
+ conf.getBoolean(USE_VECTORIZED_INPUT_FILE_FORMAT, false);
} else {
- return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
- && Utilities.getPlanPath(conf) != null
- && Utilities.getMapWork(conf).getVectorMode();
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) &&
+ Utilities.getPlanPath(conf) != null) {
+ MapWork mapWork = Utilities.getMapWork(conf);
+ return (mapWork.getVectorMode() && mapWork.getUseVectorizedInputFileFormat());
+ } else {
+ return false;
+ }
}
}
+
+ public static boolean getUseVectorizedInputFileFormat(Configuration conf, MapWork mapWork) {
+ return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) &&
+ mapWork.getVectorMode() &&
+ mapWork.getUseVectorizedInputFileFormat();
+ }
+
/**
* @param conf
* @return the configured VectorizedRowBatchCtx for a MapWork task.
@@ -3288,11 +3304,6 @@ public final class Utilities {
return result;
}
- public static boolean isVectorMode(Configuration conf, MapWork mapWork) {
- return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
- && mapWork.getVectorMode();
- }
-
public static void clearWorkMapForConf(Configuration conf) {
// Remove cached query plans for the current query only
Path mapPath = getPlanPath(conf, MAP_PLAN_NAME);
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
index c34dd1f..f90a788 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.AbstractMapOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -59,7 +60,7 @@ import org.apache.hadoop.util.StringUtils;
*/
public class ExecMapper extends MapReduceBase implements Mapper {
- private MapOperator mo;
+ private AbstractMapOperator mo;
private OutputCollector oc;
private JobConf jc;
private boolean abort = false;
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
index d8fe35f..48dfedc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.AbstractMapOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -55,7 +56,7 @@ import org.apache.hadoop.mapred.Reporter;
*/
public class SparkMapRecordHandler extends SparkRecordHandler {
private static final Logger LOG = LoggerFactory.getLogger(SparkMapRecordHandler.class);
- private MapOperator mo;
+ private AbstractMapOperator mo;
private MapredLocalWork localWork = null;
private boolean isLogInfoEnabled = false;
private ExecMapperContext execContext;
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 79da860..a1e4e6c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -625,9 +625,10 @@ public class DagUtils {
// generation we end up not finding the map work. This is
// because of thread local madness (tez split generation is
// multi-threaded - HS2 plan cache uses thread locals). Setting
- // VECTOR_MODE causes the split gen code to use the conf instead
+ // VECTOR_MODE/USE_VECTORIZED_INPUT_FILE_FORMAT causes the split gen code to use the conf instead
// of the map work.
conf.setBoolean(Utilities.VECTOR_MODE, mapWork.getVectorMode());
+ conf.setBoolean(Utilities.USE_VECTORIZED_INPUT_FILE_FORMAT, mapWork.getUseVectorizedInputFileFormat());
dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir,
"split_" + mapWork.getName().replaceAll(" ", "_")), true);
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 0584ad8..9a9f43a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AbstractMapOperator;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
@@ -75,8 +76,8 @@ public class MapRecordProcessor extends RecordProcessor {
public static final Logger l4j = LoggerFactory.getLogger(MapRecordProcessor.class);
protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
- private MapOperator mapOp;
- private final List<MapOperator> mergeMapOpList = new ArrayList<MapOperator>();
+ private AbstractMapOperator mapOp;
+ private final List<AbstractMapOperator> mergeMapOpList = new ArrayList<AbstractMapOperator>();
private MapRecordSource[] sources;
private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<String, MultiMRInput>();
private int position;
@@ -183,7 +184,7 @@ public class MapRecordProcessor extends RecordProcessor {
boolean fromCache = false;
if (mergeWorkList != null) {
- MapOperator mergeMapOp = null;
+ AbstractMapOperator mergeMapOp = null;
for (BaseWork mergeWork : mergeWorkList) {
MapWork mergeMapWork = (MapWork) mergeWork;
if (mergeMapWork.getVectorMode()) {
@@ -261,7 +262,7 @@ public class MapRecordProcessor extends RecordProcessor {
initializeMapRecordSources();
mapOp.initializeMapOperator(jconf);
if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) {
- for (MapOperator mergeMapOp : mergeMapOpList) {
+ for (AbstractMapOperator mergeMapOp : mergeMapOpList) {
jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName());
mergeMapOp.initializeMapOperator(jconf);
}
@@ -309,7 +310,7 @@ public class MapRecordProcessor extends RecordProcessor {
reader = legacyMRInput.getReader();
}
sources[position].init(jconf, mapOp, reader);
- for (MapOperator mapOp : mergeMapOpList) {
+ for (AbstractMapOperator mapOp : mergeMapOpList) {
int tag = mapOp.getConf().getTag();
sources[tag] = new MapRecordSource();
String inputName = mapOp.getConf().getName();
@@ -326,7 +327,7 @@ public class MapRecordProcessor extends RecordProcessor {
@SuppressWarnings("deprecation")
private KeyValueReader getKeyValueReader(Collection<KeyValueReader> keyValueReaders,
- MapOperator mapOp)
+ AbstractMapOperator mapOp)
throws Exception {
List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(keyValueReaders);
// this sets up the map operator contexts correctly
@@ -394,7 +395,7 @@ public class MapRecordProcessor extends RecordProcessor {
}
mapOp.close(abort);
if (mergeMapOpList.isEmpty() == false) {
- for (MapOperator mergeMapOp : mergeMapOpList) {
+ for (AbstractMapOperator mergeMapOp : mergeMapOpList) {
mergeMapOp.close(abort);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
index b53c933..add7d08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.AbstractMapOperator;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.Writable;
@@ -39,11 +39,11 @@ public class MapRecordSource implements RecordSource {
public static final Logger LOG = LoggerFactory.getLogger(MapRecordSource.class);
private ExecMapperContext execContext = null;
- private MapOperator mapOp = null;
+ private AbstractMapOperator mapOp = null;
private KeyValueReader reader = null;
private final boolean grouped = false;
- void init(JobConf jconf, MapOperator mapOp, KeyValueReader reader) throws IOException {
+ void init(JobConf jconf, AbstractMapOperator mapOp, KeyValueReader reader) throws IOException {
execContext = mapOp.getExecContext();
this.mapOp = mapOp;
this.reader = reader;
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index 1f75d07..e966ff1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -413,7 +413,7 @@ public class ReduceRecordSource implements RecordSource {
// VectorizedBatchUtil.displayBytes(keyBytes, 0, keyLength));
keyBinarySortableDeserializeToRow.setBytes(keyBytes, 0, keyLength);
- keyBinarySortableDeserializeToRow.deserializeByValue(batch, 0);
+ keyBinarySortableDeserializeToRow.deserialize(batch, 0);
for(int i = 0; i < firstValueColumnOffset; i++) {
VectorizedBatchUtil.setRepeatingColumn(batch, i);
}
@@ -431,7 +431,7 @@ public class ReduceRecordSource implements RecordSource {
// VectorizedBatchUtil.displayBytes(valueBytes, 0, valueLength));
valueLazyBinaryDeserializeToRow.setBytes(valueBytes, 0, valueLength);
- valueLazyBinaryDeserializeToRow.deserializeByValue(batch, rowIdx);
+ valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx);
}
rowIdx++;
if (rowIdx >= BATCH_SIZE) {
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
index 1951569..2bf6ac5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
@@ -45,7 +45,7 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator {
private transient boolean firstBatch;
- private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+ private transient VectorExtractRow vectorExtractRow;
protected transient Object[] singleRow;
@@ -88,16 +88,14 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator {
VectorizedRowBatch batch = (VectorizedRowBatch) data;
if (firstBatch) {
- vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
- vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
+ vectorExtractRow = new VectorExtractRow();
+ vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns());
- singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+ singleRow = new Object[vectorExtractRow.getCount()];
firstBatch = false;
}
- vectorExtractRowDynBatch.setBatchOnEntry(batch);
-
ObjectInspector rowInspector = inputObjInspectors[0];
try {
Writable writableRow;
@@ -105,7 +103,7 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator {
int selected[] = batch.selected;
for (int logical = 0 ; logical < batch.size; logical++) {
int batchIndex = selected[logical];
- vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ vectorExtractRow.extractRow(batch, batchIndex, singleRow);
writableRow = serializer.serialize(singleRow, rowInspector);
writableRow.write(buffer);
if (buffer.getLength() > MAX_SIZE) {
@@ -117,7 +115,7 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator {
}
} else {
for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
- vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+ vectorExtractRow.extractRow(batch, batchIndex, singleRow);
writableRow = serializer.serialize(singleRow, rowInspector);
writableRow.write(buffer);
if (buffer.getLength() > MAX_SIZE) {
@@ -133,7 +131,5 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator {
}
forward(data, rowInspector);
-
- vectorExtractRowDynBatch.forgetBatchOnExit();
}
}