You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/13 02:12:27 UTC
[02/51] [abbrv] hive git commit: HIVE-11886 : LLAP: merge master into
branch (Sergey Shelukhin)
HIVE-11886 : LLAP: merge master into branch (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f324305a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f324305a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f324305a
Branch: refs/heads/master
Commit: f324305a71ac31faa568b8a0078b1e9b217a3570
Parents: 79c7031 e9c8d7c
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Sep 18 13:35:36 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Sep 18 13:35:36 2015 -0700
----------------------------------------------------------------------
.../benchmark/serde/LazySimpleSerDeBench.java | 453 ++++
.../hive/ql/security/FolderPermissionBase.java | 17 +-
.../test/resources/testconfiguration.properties | 2 +
.../org/apache/hadoop/hive/ql/QTestUtil.java | 9 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 3 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 6 +
pom.xml | 22 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 269 +-
.../apache/hadoop/hive/ql/exec/MoveTask.java | 19 +-
.../ql/exec/persistence/PTFRowContainer.java | 14 +-
.../hive/ql/exec/persistence/RowContainer.java | 12 +-
.../ql/exec/tez/tools/KeyValuesInputMerger.java | 1 -
.../ql/exec/vector/VectorizationContext.java | 10 +-
.../hadoop/hive/ql/hooks/LineageLogger.java | 95 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 150 +-
.../apache/hadoop/hive/ql/io/orc/OrcSerde.java | 1 +
.../apache/hadoop/hive/ql/io/orc/OrcStruct.java | 2 +-
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 4 +-
.../hive/ql/io/parquet/ProjectionPusher.java | 3 +-
.../hive/ql/io/sarg/ConvertAstToSearchArg.java | 4 +
.../apache/hadoop/hive/ql/lib/RuleRegExp.java | 61 +-
.../ql/optimizer/ColumnPrunerProcFactory.java | 3 +
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 4 +-
.../calcite/reloperators/HiveBetween.java | 75 +
.../optimizer/calcite/reloperators/HiveIn.java | 41 +
.../rules/HiveAggregateProjectMergeRule.java | 151 ++
.../calcite/rules/HivePreFilteringRule.java | 37 +-
.../calcite/rules/HiveRelFieldTrimmer.java | 145 +-
.../translator/PlanModifierForASTConv.java | 4 +-
.../translator/SqlFunctionConverter.java | 16 +-
.../hive/ql/optimizer/lineage/LineageCtx.java | 8 +-
.../hadoop/hive/ql/parse/CalcitePlanner.java | 11 +-
.../hive/ql/parse/DDLSemanticAnalyzer.java | 17 +
.../apache/hadoop/hive/ql/parse/HiveParser.g | 7 +-
.../apache/hadoop/hive/ql/parse/QBSubQuery.java | 7 -
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +
.../hive/ql/parse/SemanticAnalyzerFactory.java | 2 +
.../hadoop/hive/ql/parse/SubQueryUtils.java | 11 -
.../org/apache/hadoop/hive/ql/plan/DDLWork.java | 21 +
.../hadoop/hive/ql/plan/HiveOperation.java | 1 +
.../hive/ql/plan/ShowCreateDatabaseDesc.java | 94 +
.../authorization/plugin/HiveOperationType.java | 1 +
.../plugin/sqlstd/Operation2Privilege.java | 2 +
.../org/apache/hadoop/hive/ql/udf/UDFJson.java | 2 +
.../hive/ql/udf/generic/GenericUDAFMax.java | 16 +-
.../exec/persistence/TestPTFRowContainer.java | 31 +-
.../hadoop/hive/ql/io/orc/TestOrcStruct.java | 2 +
.../clientpositive/drop_table_with_index.q | 35 +
.../queries/clientpositive/exchgpartition2lel.q | 32 +
ql/src/test/queries/clientpositive/lineage3.q | 26 +
.../test/queries/clientpositive/load_orc_part.q | 5 +
.../clientpositive/show_create_database.q | 3 +
.../queries/clientpositive/subquery_views.q | 22 +-
.../queries/clientpositive/vector_char_cast.q | 9 +
.../queries/clientpositive/windowing_udaf.q | 4 +
.../subquery_exists_implicit_gby.q.out | 8 +-
.../subquery_nested_subquery.q.out | 4 +-
.../subquery_notexists_implicit_gby.q.out | 8 +-
.../subquery_windowing_corr.q.out | 7 +-
.../alter_partition_coltype.q.out | 8 +-
.../clientpositive/annotate_stats_groupby.q.out | 106 +-
.../annotate_stats_groupby2.q.out | 28 +-
.../results/clientpositive/auto_join18.q.out | 12 +-
.../auto_join18_multi_distinct.q.out | 12 +-
.../results/clientpositive/auto_join27.q.out | 18 +-
.../results/clientpositive/auto_join32.q.out | 4 +-
.../clientpositive/binarysortable_1.q.out | Bin 4329 -> 4325 bytes
.../clientpositive/correlationoptimizer2.q.out | 220 +-
.../clientpositive/correlationoptimizer6.q.out | 232 +-
ql/src/test/results/clientpositive/count.q.out | 14 +-
.../results/clientpositive/ctas_colname.q.out | 52 +-
.../test/results/clientpositive/database.q.out | 2 +-
.../clientpositive/decimal_precision.q.out | 4 +-
.../results/clientpositive/decimal_udf.q.out | 30 +-
.../results/clientpositive/distinct_stats.q.out | 14 +-
.../clientpositive/drop_table_with_index.q.out | 152 ++
.../dynpart_sort_opt_vectorization.q.out | 105 +-
.../dynpart_sort_optimization.q.out | 105 +-
...ryption_select_read_only_encrypted_tbl.q.out | 4 +-
.../clientpositive/exchgpartition2lel.q.out | 182 ++
.../clientpositive/explain_logical.q.out | 78 +-
.../clientpositive/fetch_aggregation.q.out | 4 +-
.../test/results/clientpositive/gby_star.q.out | 54 +-
.../test/results/clientpositive/groupby12.q.out | 6 +-
.../results/clientpositive/groupby5_map.q.out | 4 +-
.../clientpositive/groupby5_map_skew.q.out | 4 +-
.../results/clientpositive/groupby_cube1.q.out | 12 +-
.../groupby_distinct_samekey.q.out | 6 +-
.../clientpositive/groupby_grouping_sets2.q.out | 10 +-
.../clientpositive/groupby_grouping_sets3.q.out | 12 +-
.../clientpositive/groupby_grouping_sets5.q.out | 8 +-
.../clientpositive/groupby_grouping_sets6.q.out | 8 +-
.../clientpositive/groupby_position.q.out | 36 +-
.../clientpositive/groupby_resolution.q.out | 60 +-
.../clientpositive/groupby_rollup1.q.out | 12 +-
.../clientpositive/groupby_sort_10.q.out | 8 +-
.../clientpositive/groupby_sort_11.q.out | 10 +-
.../results/clientpositive/groupby_sort_8.q.out | 12 +-
ql/src/test/results/clientpositive/having.q.out | 62 +-
.../test/results/clientpositive/having2.q.out | 12 +-
.../clientpositive/index_auto_mult_tables.q.out | 12 +-
.../clientpositive/index_auto_self_join.q.out | 12 +-
.../clientpositive/index_auto_update.q.out | 6 +-
.../index_bitmap_auto_partitioned.q.out | 6 +-
.../index_bitmap_compression.q.out | 6 +-
.../infer_bucket_sort_dyn_part.q.out | 4 +-
.../infer_bucket_sort_map_operators.q.out | 4 +-
ql/src/test/results/clientpositive/join18.q.out | 12 +-
.../clientpositive/join18_multi_distinct.q.out | 12 +-
ql/src/test/results/clientpositive/join31.q.out | 36 +-
.../limit_partition_metadataonly.q.out | 4 +-
.../results/clientpositive/limit_pushdown.q.out | 36 +-
.../test/results/clientpositive/lineage2.q.out | 2 +-
.../test/results/clientpositive/lineage3.q.out | 72 +-
.../list_bucket_query_multiskew_3.q.out | 2 +-
.../results/clientpositive/load_orc_part.q.out | 26 +
.../clientpositive/mapjoin_mapjoin.q.out | 32 +-
.../clientpositive/metadata_only_queries.q.out | 4 +-
.../results/clientpositive/metadataonly1.q.out | 112 +-
.../results/clientpositive/multiMapJoin2.q.out | 226 +-
.../nonblock_op_deduplicate.q.out | 8 +-
.../results/clientpositive/nonmr_fetch.q.out | 14 +-
.../clientpositive/partition_multilevels.q.out | 8 +-
.../test/results/clientpositive/ppd_gby.q.out | 12 +-
.../test/results/clientpositive/ppd_gby2.q.out | 60 +-
.../clientpositive/ppd_join_filter.q.out | 98 +-
.../ql_rewrite_gbtoidx_cbo_1.q.out | 168 +-
.../ql_rewrite_gbtoidx_cbo_2.q.out | 94 +-
.../reduce_deduplicate_extended.q.out | 32 +-
.../clientpositive/selectDistinctStar.q.out | 44 +-
.../clientpositive/show_create_database.q.out | 19 +
.../clientpositive/spark/auto_join18.q.out | 10 +-
.../spark/auto_join18_multi_distinct.q.out | 12 +-
.../clientpositive/spark/auto_join27.q.out | 18 +-
.../clientpositive/spark/auto_join32.q.out | 53 +-
.../results/clientpositive/spark/count.q.out | 14 +-
.../clientpositive/spark/groupby5_map.q.out | 4 +-
.../spark/groupby5_map_skew.q.out | 4 +-
.../clientpositive/spark/groupby_cube1.q.out | 12 +-
.../clientpositive/spark/groupby_position.q.out | 18 +-
.../spark/groupby_resolution.q.out | 60 +-
.../clientpositive/spark/groupby_rollup1.q.out | 12 +-
.../results/clientpositive/spark/having.q.out | 62 +-
.../spark/infer_bucket_sort_map_operators.q.out | 4 +-
.../results/clientpositive/spark/join18.q.out | 10 +-
.../spark/join18_multi_distinct.q.out | 12 +-
.../results/clientpositive/spark/join31.q.out | 36 +-
.../spark/limit_partition_metadataonly.q.out | 4 +-
.../clientpositive/spark/limit_pushdown.q.out | 34 +-
.../clientpositive/spark/mapjoin_mapjoin.q.out | 24 +-
.../spark/metadata_only_queries.q.out | 4 +-
.../clientpositive/spark/ppd_join_filter.q.out | 90 +-
.../spark/ql_rewrite_gbtoidx_cbo_1.q.out | 168 +-
.../clientpositive/spark/stats_only_null.q.out | 8 +-
.../clientpositive/spark/subquery_in.q.out | 36 +-
.../results/clientpositive/spark/union11.q.out | 42 +-
.../results/clientpositive/spark/union14.q.out | 28 +-
.../results/clientpositive/spark/union15.q.out | 28 +-
.../results/clientpositive/spark/union28.q.out | 4 +-
.../results/clientpositive/spark/union30.q.out | 4 +-
.../results/clientpositive/spark/union33.q.out | 8 +-
.../results/clientpositive/spark/union5.q.out | 34 +-
.../results/clientpositive/spark/union7.q.out | 28 +-
.../clientpositive/spark/union_remove_21.q.out | 4 +-
.../spark/vector_count_distinct.q.out | 4 +-
.../spark/vector_decimal_aggregate.q.out | 12 +-
.../spark/vector_distinct_2.q.out | 28 +-
.../clientpositive/spark/vector_groupby_3.q.out | 30 +-
.../spark/vector_mapjoin_reduce.q.out | 36 +-
.../clientpositive/spark/vector_orderby_5.q.out | 6 +-
.../clientpositive/spark/vectorization_0.q.out | 16 +-
.../clientpositive/spark/vectorization_13.q.out | 32 +-
.../clientpositive/spark/vectorization_15.q.out | 16 +-
.../clientpositive/spark/vectorization_16.q.out | 16 +-
.../clientpositive/spark/vectorization_9.q.out | 16 +-
.../spark/vectorization_pushdown.q.out | 4 +-
.../spark/vectorization_short_regress.q.out | 74 +-
.../spark/vectorized_nested_mapjoin.q.out | 17 +-
.../spark/vectorized_timestamp_funcs.q.out | 12 +-
.../clientpositive/stats_only_null.q.out | 8 +-
.../results/clientpositive/stats_ppr_all.q.out | 16 +-
.../subq_where_serialization.q.out | 18 +-
.../clientpositive/subquery_exists_having.q.out | 48 +-
.../results/clientpositive/subquery_in.q.out | 36 +-
.../clientpositive/subquery_in_having.q.out | 260 +-
.../clientpositive/subquery_notexists.q.out | 18 +-
.../subquery_notexists_having.q.out | 26 +-
.../results/clientpositive/subquery_notin.q.out | 24 +-
.../subquery_notin_having.q.java1.7.out | 50 +-
.../subquery_unqualcolumnrefs.q.out | 74 +-
.../results/clientpositive/subquery_views.q.out | 124 +-
.../test/results/clientpositive/tez/count.q.out | 14 +-
.../tez/dynamic_partition_pruning.q.out | 88 +-
.../tez/dynpart_sort_opt_vectorization.q.out | 90 +-
.../tez/dynpart_sort_optimization.q.out | 89 +-
.../clientpositive/tez/explainuser_1.q.out | 2319 +++++++++---------
.../clientpositive/tez/explainuser_2.q.out | 782 +++---
.../results/clientpositive/tez/having.q.out | 62 +-
.../clientpositive/tez/limit_pushdown.q.out | 34 +-
.../clientpositive/tez/mapjoin_mapjoin.q.out | 24 +-
.../tez/metadata_only_queries.q.out | 4 +-
.../clientpositive/tez/metadataonly1.q.out | 44 +-
.../test/results/clientpositive/tez/mrr.q.out | 94 +-
.../clientpositive/tez/selectDistinctStar.q.out | 44 +-
.../tez/show_create_database.q.out | 19 +
.../clientpositive/tez/stats_only_null.q.out | 8 +-
.../clientpositive/tez/subquery_in.q.out | 36 +-
.../results/clientpositive/tez/tez_dml.q.out | 6 +-
.../results/clientpositive/tez/union5.q.out | 44 +-
.../results/clientpositive/tez/union7.q.out | 28 +-
.../clientpositive/tez/unionDistinct_1.q.out | 8 +-
.../clientpositive/tez/vector_aggregate_9.q.out | 4 +-
.../tez/vector_binary_join_groupby.q.out | 4 +-
.../clientpositive/tez/vector_char_cast.q.out | 35 +
.../tez/vector_count_distinct.q.out | 4 +-
.../tez/vector_decimal_aggregate.q.out | 12 +-
.../tez/vector_decimal_precision.q.out | 4 +-
.../clientpositive/tez/vector_decimal_udf.q.out | 30 +-
.../clientpositive/tez/vector_distinct_2.q.out | 28 +-
.../clientpositive/tez/vector_groupby_3.q.out | 30 +-
.../tez/vector_groupby_reduce.q.out | 8 +-
.../tez/vector_grouping_sets.q.out | 8 +-
.../tez/vector_mapjoin_reduce.q.out | 36 +-
.../clientpositive/tez/vector_orderby_5.q.out | 6 +-
.../clientpositive/tez/vector_outer_join2.q.out | 20 +-
.../tez/vector_partition_diff_num_cols.q.out | 20 +-
.../tez/vector_partitioned_date_time.q.out | 12 +-
.../tez/vector_reduce_groupby_decimal.q.out | 24 +-
.../clientpositive/tez/vectorization_0.q.out | 16 +-
.../clientpositive/tez/vectorization_13.q.out | 32 +-
.../clientpositive/tez/vectorization_15.q.out | 16 +-
.../clientpositive/tez/vectorization_16.q.out | 16 +-
.../clientpositive/tez/vectorization_9.q.out | 16 +-
.../tez/vectorization_limit.q.out | 14 +-
.../tez/vectorization_pushdown.q.out | 4 +-
.../tez/vectorization_short_regress.q.out | 74 +-
.../tez/vectorized_distinct_gby.q.out | 8 +-
.../vectorized_dynamic_partition_pruning.q.out | 88 +-
.../tez/vectorized_nested_mapjoin.q.out | 18 +-
.../clientpositive/tez/vectorized_parquet.q.out | 6 +-
.../tez/vectorized_timestamp_funcs.q.out | 12 +-
ql/src/test/results/clientpositive/udf8.q.out | 4 +-
.../test/results/clientpositive/udf_count.q.out | 16 +-
.../test/results/clientpositive/union11.q.out | 70 +-
.../test/results/clientpositive/union14.q.out | 32 +-
.../test/results/clientpositive/union15.q.out | 38 +-
.../test/results/clientpositive/union28.q.out | 8 +-
.../test/results/clientpositive/union30.q.out | 8 +-
.../test/results/clientpositive/union33.q.out | 8 +-
ql/src/test/results/clientpositive/union5.q.out | 48 +-
ql/src/test/results/clientpositive/union7.q.out | 32 +-
.../clientpositive/unionDistinct_1.q.out | 8 +-
.../clientpositive/union_remove_21.q.out | 8 +-
.../clientpositive/vector_aggregate_9.q.out | 4 +-
.../vector_aggregate_without_gby.q.out | 4 +-
.../vector_binary_join_groupby.q.out | 4 +-
.../clientpositive/vector_char_cast.q.out | 35 +
.../clientpositive/vector_count_distinct.q.out | 6 +-
.../vector_decimal_aggregate.q.out | 12 +-
.../vector_decimal_precision.q.out | 4 +-
.../clientpositive/vector_decimal_udf.q.out | 30 +-
.../clientpositive/vector_distinct_2.q.out | 28 +-
.../clientpositive/vector_groupby_3.q.out | 30 +-
.../clientpositive/vector_groupby_reduce.q.out | 8 +-
.../clientpositive/vector_grouping_sets.q.out | 8 +-
.../clientpositive/vector_left_outer_join.q.out | 8 +-
.../clientpositive/vector_mapjoin_reduce.q.out | 36 +-
.../clientpositive/vector_orderby_5.q.out | 6 +-
.../clientpositive/vector_outer_join1.q.out | 8 +-
.../clientpositive/vector_outer_join2.q.out | 28 +-
.../clientpositive/vector_outer_join3.q.out | 24 +-
.../clientpositive/vector_outer_join4.q.out | 8 +-
.../clientpositive/vector_outer_join5.q.out | 48 +-
.../vector_partition_diff_num_cols.q.out | 20 +-
.../vector_partitioned_date_time.q.out | 12 +-
.../vector_reduce_groupby_decimal.q.out | 24 +-
.../clientpositive/vectorization_0.q.out | 16 +-
.../clientpositive/vectorization_13.q.out | 32 +-
.../clientpositive/vectorization_15.q.out | 16 +-
.../clientpositive/vectorization_16.q.out | 16 +-
.../clientpositive/vectorization_9.q.out | 16 +-
.../clientpositive/vectorization_limit.q.out | 16 +-
.../clientpositive/vectorization_pushdown.q.out | 4 +-
.../vectorization_short_regress.q.out | 74 +-
.../vectorized_distinct_gby.q.out | 12 +-
.../vectorized_nested_mapjoin.q.out | 26 +-
.../clientpositive/vectorized_parquet.q.out | 6 +-
.../vectorized_parquet_types.q.out | 6 +-
.../vectorized_timestamp_funcs.q.out | 12 +-
.../results/clientpositive/windowing_udaf.q.out | 12 +
.../hive/serde2/ColumnProjectionUtils.java | 22 +
.../hadoop/hive/serde2/lazy/LazyByte.java | 4 +
.../hadoop/hive/serde2/lazy/LazyDouble.java | 4 +
.../hadoop/hive/serde2/lazy/LazyFloat.java | 4 +
.../hadoop/hive/serde2/lazy/LazyInteger.java | 4 +
.../hadoop/hive/serde2/lazy/LazyLong.java | 4 +
.../hadoop/hive/serde2/lazy/LazyShort.java | 4 +
.../hadoop/hive/serde2/lazy/LazyUtils.java | 28 +
.../org/apache/hive/service/cli/Column.java | 2 +-
.../org/apache/hive/service/cli/TestColumn.java | 129 +
.../hive/ql/io/sarg/SearchArgumentFactory.java | 5 +-
.../hive/ql/io/sarg/SearchArgumentImpl.java | 7 +-
302 files changed, 7055 insertions(+), 5389 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --cc llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 4f7bb78,0000000..c934f39
mode 100644,000000..100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@@ -1,949 -1,0 +1,950 @@@
+package org.apache.hadoop.hive.llap.io.encoded;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.CallableWithNdc;
+import org.apache.hadoop.hive.common.Pool;
+import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.ConsumerFeedback;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.Cache;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
+import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
+import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
+import org.apache.hadoop.hive.ql.io.orc.DataReader;
+import org.apache.hadoop.hive.ql.io.orc.MetadataReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
+import org.apache.hadoop.hive.ql.io.orc.OrcConf;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
+import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
+import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
+import org.apache.hadoop.hive.ql.io.orc.encoded.OrcCacheKey;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
+import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hive.common.util.FixedSizedObjectPool;
+
+/**
+ * This produces EncodedColumnBatch via ORC EncodedDataImpl.
+ * It serves as Consumer for EncodedColumnBatch too, for the high-level cache scenario where
+ * it inserts itself into the pipeline to put the data in cache, before passing it to the real
+ * consumer. It also serves as ConsumerFeedback that receives processed EncodedColumnBatch-es.
+ */
+public class OrcEncodedDataReader extends CallableWithNdc<Void>
+ implements ConsumerFeedback<OrcEncodedColumnBatch>, Consumer<OrcEncodedColumnBatch> {
+ private static final Log LOG = LogFactory.getLog(OrcEncodedDataReader.class);
+ public static final FixedSizedObjectPool<ColumnStreamData> CSD_POOL =
+ new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
+ @Override
+ public ColumnStreamData create() {
+ return new ColumnStreamData();
+ }
+ @Override
+ public void resetBeforeOffer(ColumnStreamData t) {
+ t.reset();
+ }
+ });
+ public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL =
+ new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() {
+ @Override
+ public OrcEncodedColumnBatch create() {
+ return new OrcEncodedColumnBatch();
+ }
+ @Override
+ public void resetBeforeOffer(OrcEncodedColumnBatch t) {
+ t.reset();
+ }
+ });
+ private static final PoolFactory POOL_FACTORY = new PoolFactory() {
+ @Override
+ public <T> Pool<T> createPool(int size, PoolObjectHelper<T> helper) {
+ return new FixedSizedObjectPool<>(size, helper);
+ }
+
+ @Override
+ public Pool<ColumnStreamData> createColumnStreamDataPool() {
+ return CSD_POOL;
+ }
+
+ @Override
+ public Pool<OrcEncodedColumnBatch> createEncodedColumnBatchPool() {
+ return ECB_POOL;
+ }
+ };
+
+ private final OrcMetadataCache metadataCache;
+ private final LowLevelCache lowLevelCache;
+ private final Configuration conf;
+ private final Cache<OrcCacheKey> cache;
+ private final FileSplit split;
+ private List<Integer> columnIds;
+ private final SearchArgument sarg;
+ private final String[] columnNames;
+ private final OrcEncodedDataConsumer consumer;
+ private final QueryFragmentCounters counters;
+
+ // Read state.
+ private int stripeIxFrom;
+ private OrcFileMetadata fileMetadata;
+ private Reader orcReader;
+ private MetadataReader metadataReader;
+ private EncodedReader stripeReader;
+ private long fileId;
+ private FileSystem fs;
+ /**
+ * readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of rg-s that need to be
+ * read. Contains only stripes that are read, and only columns included. null => read all RGs.
+ */
+ private boolean[][][] readState;
+ private volatile boolean isStopped = false;
+ @SuppressWarnings("unused")
+ private volatile boolean isPaused = false;
+
+ public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache,
+ OrcMetadataCache metadataCache, Configuration conf, InputSplit split,
+ List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+ OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) {
+ this.lowLevelCache = lowLevelCache;
+ this.metadataCache = metadataCache;
+ this.cache = cache;
+ this.conf = conf;
+ this.split = (FileSplit)split;
+ this.columnIds = columnIds;
+ if (this.columnIds != null) {
+ Collections.sort(this.columnIds);
+ }
+ this.sarg = sarg;
+ this.columnNames = columnNames;
+ this.consumer = consumer;
+ this.counters = counters;
+ }
+
+ @Override
+ public void stop() {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Encoded reader is being stopped");
+ }
+ isStopped = true;
+ }
+
+ @Override
+ public void pause() {
+ isPaused = true;
+ // TODO: pause fetching
+ }
+
+ @Override
+ public void unpause() {
+ isPaused = false;
+ // TODO: unpause fetching
+ }
+
+ @Override
+ protected Void callInternal() throws IOException {
+ long startTime = counters.startTimeCounter();
+ if (LlapIoImpl.LOGL.isInfoEnabled()) {
+ LlapIoImpl.LOG.info("Processing data for " + split.getPath());
+ }
+ if (processStop()) {
+ recordReaderTime(startTime);
+ return null;
+ }
+ counters.setDesc(QueryFragmentCounters.Desc.TABLE, getDbAndTableName(split.getPath()));
+ orcReader = null;
+ // 1. Get file metadata from cache, or create the reader and read it.
+ // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
+ fs = split.getPath().getFileSystem(conf);
+ fileId = determineFileId(fs, split);
+ counters.setDesc(QueryFragmentCounters.Desc.FILE, fileId);
+
+ try {
+ fileMetadata = getOrReadFileMetadata();
+ consumer.setFileMetadata(fileMetadata);
+ validateFileMetadata();
+ if (columnIds == null) {
+ columnIds = createColumnIds(fileMetadata);
+ }
+
+ // 2. Determine which stripes to read based on the split.
+ determineStripesToRead();
+ } catch (Throwable t) {
+ recordReaderTime(startTime);
+ consumer.setError(t);
+ return null;
+ }
+
+ if (readState.length == 0) {
+ consumer.setDone();
+ recordReaderTime(startTime);
+ return null; // No data to read.
+ }
+ counters.setDesc(QueryFragmentCounters.Desc.STRIPES, stripeIxFrom + "," + readState.length);
+
+ // 3. Apply SARG if needed, and otherwise determine what RGs to read.
+ int stride = fileMetadata.getRowIndexStride();
+ ArrayList<OrcStripeMetadata> stripeMetadatas = null;
+ boolean[] globalIncludes = null;
+ boolean[] sargColumns = null;
+ try {
+ globalIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), columnIds, true);
+ if (sarg != null && stride != 0) {
+ // TODO: move this to a common method
- int[] filterColumns = RecordReaderImpl.mapSargColumns(sarg.getLeaves(), columnNames, 0);
++ int[] filterColumns = RecordReaderImpl.mapSargColumnsToOrcInternalColIdx(
++ sarg.getLeaves(), columnNames, 0);
+ // included will not be null, row options will fill the array with trues if null
+ sargColumns = new boolean[globalIncludes.length];
+ for (int i : filterColumns) {
+ // filter columns may have -1 as index which could be partition column in SARG.
+ if (i > 0) {
+ sargColumns[i] = true;
+ }
+ }
+
+ // If SARG is present, get relevant stripe metadata from cache or readers.
+ stripeMetadatas = readStripesMetadata(globalIncludes, sargColumns);
+ }
+
+ // Now, apply SARG if any; w/o sarg, this will just initialize readState.
+ boolean hasData = determineRgsToRead(globalIncludes, stride, stripeMetadatas);
+ if (!hasData) {
+ consumer.setDone();
+ recordReaderTime(startTime);
+ return null; // No data to read.
+ }
+ } catch (Throwable t) {
+ cleanupReaders();
+ consumer.setError(t);
+ recordReaderTime(startTime);
+ return null;
+ }
+
+ if (processStop()) {
+ cleanupReaders();
+ recordReaderTime(startTime);
+ return null;
+ }
+
+ // 4. Get data from high-level cache.
+ // If some cols are fully in cache, this will also give us the modified list of columns to
+ // read for every stripe (null means read all of them - the usual path). In any case,
+ // readState will be modified for column x rgs that were fetched from high-level cache.
+ List<Integer>[] stripeColsToRead = null;
+ if (cache != null) {
+ try {
+ stripeColsToRead = produceDataFromCache(stride);
+ } catch (Throwable t) {
+ // produceDataFromCache handles its own cleanup.
+ consumer.setError(t);
+ cleanupReaders();
+ recordReaderTime(startTime);
+ return null;
+ }
+ }
+
+ // 5. Create encoded data reader.
+ // In case if we have high-level cache, we will intercept the data and add it there;
+ // otherwise just pass the data directly to the consumer.
+ Consumer<OrcEncodedColumnBatch> dataConsumer = (cache == null) ? this.consumer : this;
+ try {
+ ensureOrcReader();
+ // Reader creating updates HDFS counters, don't do it here.
+ DataWrapperForOrc dw = new DataWrapperForOrc();
+ stripeReader = orcReader.encodedReader(fileId, dw, dw, POOL_FACTORY);
+ stripeReader.setDebugTracing(DebugUtils.isTraceOrcEnabled());
+ } catch (Throwable t) {
+ consumer.setError(t);
+ recordReaderTime(startTime);
+ cleanupReaders();
+ return null;
+ }
+
+ // 6. Read data.
+ // TODO: I/O threadpool could be here - one thread per stripe; for now, linear.
+ OrcBatchKey stripeKey = new OrcBatchKey(fileId, -1, 0);
+ for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
+ if (processStop()) {
+ cleanupReaders();
+ recordReaderTime(startTime);
+ return null;
+ }
+ int stripeIx = stripeIxFrom + stripeIxMod;
+ boolean[][] colRgs = null;
+ boolean[] stripeIncludes = null;
+ OrcStripeMetadata stripeMetadata = null;
+ StripeInformation stripe;
+ try {
+ List<Integer> cols = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod];
+ if (cols != null && cols.isEmpty()) continue; // No need to read this stripe.
+ stripe = fileMetadata.getStripes().get(stripeIx);
+
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Reading stripe " + stripeIx + ": "
+ + stripe.getOffset() + ", " + stripe.getLength());
+ }
+ colRgs = readState[stripeIxMod];
+ // We assume that NO_RGS value is only set from SARG filter and for all columns;
+ // intermediate changes for individual columns will unset values in the array.
+ // Skip this case for 0-column read. We could probably special-case it just like we do
+ // in EncodedReaderImpl, but for now it's not that important.
+ if (colRgs.length > 0 && colRgs[0] == SargApplier.READ_NO_RGS) continue;
+
+ // 6.1. Determine the columns to read (usually the same as requested).
+ if (cache == null || cols == null || cols.size() == colRgs.length) {
+ cols = columnIds;
+ stripeIncludes = globalIncludes;
+ } else {
+ // We are reading subset of the original columns, remove unnecessary bitmasks/etc.
+ // This will never happen w/o high-level cache.
+ stripeIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), cols, true);
+ colRgs = genStripeColRgs(cols, colRgs);
+ }
+
+ // 6.2. Ensure we have stripe metadata. We might have read it before for RG filtering.
+ boolean isFoundInCache = false;
+ if (stripeMetadatas != null) {
+ stripeMetadata = stripeMetadatas.get(stripeIxMod);
+ } else {
+ stripeKey.stripeIx = stripeIx;
+ stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
+ isFoundInCache = (stripeMetadata != null);
+ if (!isFoundInCache) {
+ counters.incrCounter(Counter.METADATA_CACHE_MISS);
+ ensureMetadataReader();
+ long startTimeHdfs = counters.startTimeCounter();
+ stripeMetadata = new OrcStripeMetadata(
+ stripeKey, metadataReader, stripe, stripeIncludes, sargColumns);
+ counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs);
+ stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
+ + " metadata with includes: " + DebugUtils.toString(stripeIncludes));
+ }
+ stripeKey = new OrcBatchKey(fileId, -1, 0);
+ }
+ consumer.setStripeMetadata(stripeMetadata);
+ }
+ if (!stripeMetadata.hasAllIndexes(stripeIncludes)) {
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx
+ + " metadata for includes: " + DebugUtils.toString(stripeIncludes));
+ }
+ assert isFoundInCache;
+ counters.incrCounter(Counter.METADATA_CACHE_MISS);
+ ensureMetadataReader();
+ updateLoadedIndexes(stripeMetadata, stripe, stripeIncludes, sargColumns);
+ } else if (isFoundInCache) {
+ counters.incrCounter(Counter.METADATA_CACHE_HIT);
+ }
+ } catch (Throwable t) {
+ consumer.setError(t);
+ cleanupReaders();
+ recordReaderTime(startTime);
+ return null;
+ }
+ if (processStop()) {
+ cleanupReaders();
+ recordReaderTime(startTime);
+ return null;
+ }
+
+ // 6.3. Finally, hand off to the stripe reader to produce the data.
+ // This is a sync call that will feed data to the consumer.
+ try {
+ // TODO: readEncodedColumns is not supposed to throw; errors should be propagated thru
+ // consumer. It is potentially holding locked buffers, and must perform its own cleanup.
+ // Also, currently readEncodedColumns is not stoppable. The consumer will discard the
+ // data it receives for one stripe. We could probably interrupt it, if it checked that.
+ stripeReader.readEncodedColumns(stripeIx, stripe, stripeMetadata.getRowIndexes(),
+ stripeMetadata.getEncodings(), stripeMetadata.getStreams(), stripeIncludes,
+ colRgs, dataConsumer);
+ } catch (Throwable t) {
+ consumer.setError(t);
+ cleanupReaders();
+ recordReaderTime(startTime);
+ return null;
+ }
+ }
+
+ // Done with all the things.
+ recordReaderTime(startTime);
+ dataConsumer.setDone();
+ if (DebugUtils.isTraceMttEnabled()) {
+ LlapIoImpl.LOG.info("done processing " + split);
+ }
+
+ // Close the stripe reader, we are done reading.
+ cleanupReaders();
+ return null;
+ }
+
+ private void recordReaderTime(long startTime) {
+ counters.incrTimeCounter(Counter.TOTAL_IO_TIME_US, startTime);
+ }
+
+ private static String getDbAndTableName(Path path) {
+ // Ideally, we'd get this from split; however, split doesn't contain any such thing and it's
+ // actually pretty hard to get cause even split generator only uses paths. We only need this
+ // for metrics; therefore, brace for BLACK MAGIC!
+ String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR);
+ int dbIx = -1;
+ // Try to find the default db postfix; don't check two last components - at least there
+ // should be a table and file (we could also try to throw away partition/bucket/acid stuff).
+ for (int i = 0; i < parts.length - 2; ++i) {
+ if (!parts[i].endsWith(DDLTask.DATABASE_PATH_SUFFIX)) continue;
+ if (dbIx >= 0) {
+ dbIx = -1; // Let's not guess.
+ break;
+ }
+ dbIx = i;
+ }
+ if (dbIx >= 0) {
+ return parts[dbIx].substring(0, parts[dbIx].length() - 3) + "." + parts[dbIx + 1];
+ }
+
+ // Just go from the back and throw away everything we think is wrong; skip last item, the file.
+ boolean isInPartFields = false;
+ for (int i = parts.length - 2; i >= 0; --i) {
+ String p = parts[i];
+ boolean isPartField = p.contains("=");
+ if ((isInPartFields && !isPartField) || (!isPartField && !p.startsWith(AcidUtils.BASE_PREFIX)
+ && !p.startsWith(AcidUtils.DELTA_PREFIX) && !p.startsWith(AcidUtils.BUCKET_PREFIX))) {
+ dbIx = i - 1;
+ break;
+ }
+ isInPartFields = isPartField;
+ }
+ // If we found something before we ran out of components, use it.
+ if (dbIx >= 0) {
+ String dbName = parts[dbIx];
+ if (dbName.endsWith(DDLTask.DATABASE_PATH_SUFFIX)) {
+ dbName = dbName.substring(0, dbName.length() - 3);
+ }
+ return dbName + "." + parts[dbIx + 1];
+ }
+ return "unknown";
+ }
+
+ private void validateFileMetadata() throws IOException {
+ if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
+ int bufferSize = fileMetadata.getCompressionBufferSize();
+ int minAllocSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+ if (bufferSize < minAllocSize) {
+ LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level "
+ + "cache minimum allocation size (" + minAllocSize + "). Decrease the value for "
+ + HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.toString() + " to avoid wasting memory");
+ }
+ }
+
+ private boolean processStop() {
+ if (!isStopped) return false;
+ LOG.info("Encoded data reader is stopping");
+ cleanupReaders();
+ return true;
+ }
+
+ private static long determineFileId(FileSystem fs, FileSplit split) throws IOException {
+ if (split instanceof OrcSplit) {
+ Long fileId = ((OrcSplit)split).getFileId();
+ if (fileId != null) {
+ return fileId;
+ }
+ }
+ LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
+ return HdfsUtils.getFileId(fs, split.getPath());
+ }
+
+ private boolean[][] genStripeColRgs(List<Integer> stripeCols, boolean[][] globalColRgs) {
+ boolean[][] stripeColRgs = new boolean[stripeCols.size()][];
+ for (int i = 0, i2 = -1; i < globalColRgs.length; ++i) {
+ if (globalColRgs[i] == null) continue;
+ stripeColRgs[i2] = globalColRgs[i];
+ ++i2;
+ }
+ return stripeColRgs;
+ }
+
+ /**
+ * Puts all column indexes from metadata to make a column list to read all column.
+ */
+ private static List<Integer> createColumnIds(OrcFileMetadata metadata) {
+ List<Integer> columnIds = new ArrayList<Integer>(metadata.getTypes().size());
+ for (int i = 1; i < metadata.getTypes().size(); ++i) {
+ columnIds.add(i);
+ }
+ return columnIds;
+ }
+
+ /**
+ * In case if stripe metadata in cache does not have all indexes for current query, load
+ * the missing one. This is a temporary cludge until real metadata cache becomes available.
+ */
+ private void updateLoadedIndexes(OrcStripeMetadata stripeMetadata,
+ StripeInformation stripe, boolean[] stripeIncludes, boolean[] sargColumns) throws IOException {
+ // We only synchronize on write for now - design of metadata cache is very temporary;
+ // we pre-allocate the array and never remove entries; so readers should be safe.
+ synchronized (stripeMetadata) {
+ if (stripeMetadata.hasAllIndexes(stripeIncludes)) return;
+ long startTime = counters.startTimeCounter();
+ stripeMetadata.loadMissingIndexes(metadataReader, stripe, stripeIncludes, sargColumns);
+ counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
+ }
+ }
+
+ /**
+ * Closes the stripe readers (on error).
+ */
+ private void cleanupReaders() {
+ if (metadataReader != null) {
+ try {
+ metadataReader.close();
+ } catch (IOException ex) {
+ // Ignore.
+ }
+ }
+ if (stripeReader != null) {
+ try {
+ stripeReader.close();
+ } catch (IOException ex) {
+ // Ignore.
+ }
+ }
+ }
+
+ /**
+ * Ensures orcReader is initialized for the split.
+ */
+ private void ensureOrcReader() throws IOException {
+ if (orcReader != null) return;
+ Path path = HdfsUtils.getFileIdPath(fs, split.getPath(), fileId);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Creating reader for " + path + " (" + split.getPath() + ")");
+ }
+ long startTime = counters.startTimeCounter();
+ ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata);
+ orcReader = EncodedOrcFile.createReader(path, opts);
+ counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
+ }
+
+ /**
+ * Gets file metadata for the split from cache, or reads it from the file.
+ */
+ private OrcFileMetadata getOrReadFileMetadata() throws IOException {
+ OrcFileMetadata metadata = metadataCache.getFileMetadata(fileId);
+ if (metadata != null) {
+ counters.incrCounter(Counter.METADATA_CACHE_HIT);
+ return metadata;
+ }
+ counters.incrCounter(Counter.METADATA_CACHE_MISS);
+ ensureOrcReader();
+ // We assume this call doesn't touch HDFS because everything is already read; don't add time.
+ metadata = new OrcFileMetadata(fileId, orcReader);
+ return metadataCache.putFileMetadata(metadata);
+ }
+
+ /**
+ * Reads the metadata for all stripes in the file.
+ */
+ private ArrayList<OrcStripeMetadata> readStripesMetadata(
+ boolean[] globalInc, boolean[] sargColumns) throws IOException {
+ ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(readState.length);
+ OrcBatchKey stripeKey = new OrcBatchKey(fileId, 0, 0);
+ for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
+ stripeKey.stripeIx = stripeIxMod + stripeIxFrom;
+ OrcStripeMetadata value = metadataCache.getStripeMetadata(stripeKey);
+ if (value == null || !value.hasAllIndexes(globalInc)) {
+ counters.incrCounter(Counter.METADATA_CACHE_MISS);
+ ensureMetadataReader();
+ StripeInformation si = fileMetadata.getStripes().get(stripeKey.stripeIx);
+ if (value == null) {
+ long startTime = counters.startTimeCounter();
+ value = new OrcStripeMetadata(stripeKey, metadataReader, si, globalInc, sargColumns);
+ counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
+ value = metadataCache.putStripeMetadata(value);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
+ + " metadata with includes: " + DebugUtils.toString(globalInc));
+ }
+ // Create new key object to reuse for gets; we've used the old one to put in cache.
+ stripeKey = new OrcBatchKey(fileId, 0, 0);
+ }
+ // We might have got an old value from cache; recheck it has indexes.
+ if (!value.hasAllIndexes(globalInc)) {
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx
+ + " metadata for includes: " + DebugUtils.toString(globalInc));
+ }
+ updateLoadedIndexes(value, si, globalInc, sargColumns);
+ }
+ } else {
+ counters.incrCounter(Counter.METADATA_CACHE_HIT);
+ }
+ result.add(value);
+ consumer.setStripeMetadata(value);
+ }
+ return result;
+ }
+
+ private void ensureMetadataReader() throws IOException {
+ ensureOrcReader();
+ if (metadataReader != null) return;
+ long startTime = counters.startTimeCounter();
+ metadataReader = orcReader.metadata();
+ counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
+ }
+
+ @Override
+ public void returnData(OrcEncodedColumnBatch ecb) {
+ for (ColumnStreamData[] datas : ecb.getColumnData()) {
+ if (datas == null) continue;
+ for (ColumnStreamData data : datas) {
+ if (data == null || data.decRef() != 0) continue;
+ if (DebugUtils.isTraceLockingEnabled()) {
+ for (MemoryBuffer buf : data.getCacheBuffers()) {
+ LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing");
+ }
+ }
+ lowLevelCache.releaseBuffers(data.getCacheBuffers());
+ CSD_POOL.offer(data);
+ }
+ }
+ // We can offer ECB even with some streams not discarded; reset() will clear the arrays.
+ ECB_POOL.offer(ecb);
+ }
+
+ /**
+ * Determines which RGs need to be read, after stripes have been determined.
+ * SARG is applied, and readState is populated for each stripe accordingly.
+ * @param stripes All stripes in the file (field state is used to determine stripes to read).
+ */
+ private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride,
+ ArrayList<OrcStripeMetadata> metadata) throws IOException {
+ SargApplier sargApp = null;
+ if (sarg != null && rowIndexStride != 0) {
+ List<OrcProto.Type> types = fileMetadata.getTypes();
+ String[] colNamesForSarg = OrcInputFormat.getSargColumnNames(
+ columnNames, types, globalIncludes, fileMetadata.isOriginalFormat());
+ sargApp = new SargApplier(sarg, colNamesForSarg, rowIndexStride, types, globalIncludes.length);
+ }
+ boolean hasAnyData = false;
+ // readState should have been initialized by this time with an empty array.
+ for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
+ int stripeIx = stripeIxMod + stripeIxFrom;
+ StripeInformation stripe = fileMetadata.getStripes().get(stripeIx);
+ int rgCount = getRgCount(stripe, rowIndexStride);
+ boolean[] rgsToRead = null;
+ if (sargApp != null) {
+ OrcStripeMetadata stripeMetadata = metadata.get(stripeIxMod);
+ rgsToRead = sargApp.pickRowGroups(stripe, stripeMetadata.getRowIndexes(),
+ stripeMetadata.getBloomFilterIndexes(), true);
+ }
+ boolean isNone = rgsToRead == SargApplier.READ_NO_RGS,
+ isAll = rgsToRead == SargApplier.READ_ALL_RGS;
+ hasAnyData = hasAnyData || !isNone;
+ if (DebugUtils.isTraceOrcEnabled()) {
+ if (isNone) {
+ LlapIoImpl.LOG.info("SARG eliminated all RGs for stripe " + stripeIx);
+ } else if (!isAll) {
+ LlapIoImpl.LOG.info("SARG picked RGs for stripe " + stripeIx + ": "
+ + DebugUtils.toString(rgsToRead));
+ } else {
+ LlapIoImpl.LOG.info("Will read all " + rgCount + " RGs for stripe " + stripeIx);
+ }
+ }
+ assert isAll || isNone || rgsToRead.length == rgCount;
+ readState[stripeIxMod] = new boolean[columnIds.size()][];
+ for (int j = 0; j < columnIds.size(); ++j) {
+ readState[stripeIxMod][j] = (isAll || isNone) ? rgsToRead :
+ Arrays.copyOf(rgsToRead, rgsToRead.length);
+ }
+
+ adjustRgMetric(rgCount, rgsToRead, isNone, isAll);
+ }
+ return hasAnyData;
+ }
+
+ private void adjustRgMetric(int rgCount, boolean[] rgsToRead, boolean isNone,
+ boolean isAll) {
+ int count = 0;
+ if (!isAll) {
+ for (boolean b : rgsToRead) {
+ if (b)
+ count++;
+ }
+ } else if (!isNone) {
+ count = rgCount;
+ }
+ counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, count);
+ }
+
+
+ private int getRgCount(StripeInformation stripe, int rowIndexStride) {
+ return (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
+ }
+
+ /**
+ * Determine which stripes to read for a split. Populates stripeIxFrom and readState.
+ */
+ public void determineStripesToRead() {
+ // The unit of caching for ORC is (rg x column) (see OrcBatchKey).
+ List<StripeInformation> stripes = fileMetadata.getStripes();
+ long offset = split.getStart(), maxOffset = offset + split.getLength();
+ stripeIxFrom = -1;
+ int stripeIxTo = -1;
+ if (LlapIoImpl.LOGL.isDebugEnabled()) {
+ String tmp = "FileSplit {" + split.getStart() + ", " + split.getLength() + "}; stripes ";
+ for (StripeInformation stripe : stripes) {
+ tmp += "{" + stripe.getOffset() + ", " + stripe.getLength() + "}, ";
+ }
+ LlapIoImpl.LOG.debug(tmp);
+ }
+
+ int stripeIx = 0;
+ for (StripeInformation stripe : stripes) {
+ long stripeStart = stripe.getOffset();
+ if (offset > stripeStart) {
+ // We assume splits will never start in the middle of the stripe.
+ ++stripeIx;
+ continue;
+ }
+ if (stripeIxFrom == -1) {
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Including stripes from " + stripeIx
+ + " (" + stripeStart + " >= " + offset + ")");
+ }
+ stripeIxFrom = stripeIx;
+ }
+ if (stripeStart >= maxOffset) {
+ stripeIxTo = stripeIx;
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Including stripes until " + stripeIxTo + " (" + stripeStart
+ + " >= " + maxOffset + "); " + (stripeIxTo - stripeIxFrom) + " stripes");
+ }
+ break;
+ }
+ ++stripeIx;
+ }
+ if (stripeIxTo == -1) {
+ stripeIxTo = stripeIx;
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LlapIoImpl.LOG.info("Including stripes until " + stripeIx + " (end of file); "
+ + (stripeIxTo - stripeIxFrom) + " stripes");
+ }
+ }
+ readState = new boolean[stripeIxTo - stripeIxFrom][][];
+ }
+
+ // TODO: split by stripe? we do everything by stripe, and it might be faster
+ /**
+ * Takes the data from high-level cache for all stripes and returns to consumer.
+ * @return List of columns to read per stripe, if any columns were fully eliminated by cache.
+ */
+ private List<Integer>[] produceDataFromCache(int rowIndexStride) throws IOException {
+ OrcCacheKey key = new OrcCacheKey(fileId, -1, -1, -1);
+ // For each stripe, keep a list of columns that are not fully in cache (null => all of them).
+ @SuppressWarnings("unchecked")
+ List<Integer>[] stripeColsNotInCache = new List[readState.length];
+ for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
+ key.stripeIx = stripeIxFrom + stripeIxMod;
+ boolean[][] cols = readState[stripeIxMod];
+ boolean[] isMissingAnyRgs = new boolean[cols.length];
+ int totalRgCount = getRgCount(fileMetadata.getStripes().get(key.stripeIx), rowIndexStride);
+ for (int rgIx = 0; rgIx < totalRgCount; ++rgIx) {
+ OrcEncodedColumnBatch col = ECB_POOL.take();
+ col.init(fileId, key.stripeIx, rgIx, cols.length);
+ boolean hasAnyCached = false;
+ try {
+ key.rgIx = rgIx;
+ for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
+ boolean[] readMask = cols[colIxMod];
+ // Check if RG is eliminated by SARG
+ if ((readMask == SargApplier.READ_NO_RGS) || (readMask != SargApplier.READ_ALL_RGS
+ && (readMask.length <= rgIx || !readMask[rgIx]))) continue;
+ key.colIx = columnIds.get(colIxMod);
+ ColumnStreamData[] cached = cache.get(key);
+ if (cached == null) {
+ isMissingAnyRgs[colIxMod] = true;
+ continue;
+ }
+ assert cached.length == OrcEncodedColumnBatch.MAX_DATA_STREAMS;
+ col.setAllStreamsData(colIxMod, key.colIx, cached);
+ hasAnyCached = true;
+ if (readMask == SargApplier.READ_ALL_RGS) {
+ // We were going to read all RGs, but some were in cache, allocate the mask.
+ cols[colIxMod] = readMask = new boolean[totalRgCount];
+ Arrays.fill(readMask, true);
+ }
+ readMask[rgIx] = false; // Got from cache, don't read from disk.
+ }
+ } catch (Throwable t) {
+ // TODO: Any cleanup needed to release data in col back to cache should be here.
+ throw (t instanceof IOException) ? (IOException)t : new IOException(t);
+ }
+ if (hasAnyCached) {
+ consumer.consumeData(col);
+ }
+ }
+ boolean makeStripeColList = false; // By default assume we'll fetch all original columns.
+ for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
+ if (isMissingAnyRgs[colIxMod]) {
+ if (makeStripeColList) {
+ stripeColsNotInCache[stripeIxMod].add(columnIds.get(colIxMod));
+ }
+ } else if (!makeStripeColList) {
+ // Some columns were fully in cache. Make a per-stripe col list, add previous columns.
+ makeStripeColList = true;
+ stripeColsNotInCache[stripeIxMod] = new ArrayList<Integer>(cols.length - 1);
+ for (int i = 0; i < colIxMod; ++i) {
+ stripeColsNotInCache[stripeIxMod].add(columnIds.get(i));
+ }
+ }
+ }
+ }
+ return stripeColsNotInCache;
+ }
+
+ @Override
+ public void setDone() {
+ consumer.setDone();
+ }
+
+ @Override
+ public void consumeData(OrcEncodedColumnBatch data) {
+ // Store object in cache; create new key object - cannot be reused.
+ assert cache != null;
+ throw new UnsupportedOperationException("not implemented");
+ /*for (int i = 0; i < data.getColumnData().length; ++i) {
+ OrcCacheKey key = new OrcCacheKey(data.getBatchKey(), data.getColumnIxs()[i]);
+ ColumnStreamData[] toCache = data.getColumnData()[i];
+ ColumnStreamData[] cached = cache.cacheOrGet(key, toCache);
+ if (toCache != cached) {
+ for (ColumnStreamData sb : toCache) {
+ if (sb.decRef() != 0) continue;
+ lowLevelCache.releaseBuffers(sb.getCacheBuffers());
+ }
+ data.getColumnData()[i] = cached;
+ }
+ }
+ consumer.consumeData(data);*/
+ }
+
+ @Override
+ public void setError(Throwable t) {
+ consumer.setError(t);
+ }
+
+ private class DataWrapperForOrc implements DataReader, DataCache {
+ private DataReader orcDataReader;
+
+ public DataWrapperForOrc() {
+ boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
+ if (useZeroCopy && !lowLevelCache.getAllocator().isDirectAlloc()) {
+ throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache "
+ + "buffers; either disable zero-copy or enable direct cache allocation");
+ }
+ this.orcDataReader = orcReader.createDefaultDataReader(useZeroCopy);
+ }
+
+ @Override
+ public DiskRangeList getFileData(long fileId, DiskRangeList range,
+ long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
+ return lowLevelCache.getFileData(fileId, range, baseOffset, factory, counters, gotAllData);
+ }
+
+ @Override
+ public long[] putFileData(long fileId, DiskRange[] ranges,
+ MemoryBuffer[] data, long baseOffset) {
+ return lowLevelCache.putFileData(
+ fileId, ranges, data, baseOffset, Priority.NORMAL, counters);
+ }
+
+ @Override
+ public void releaseBuffer(MemoryBuffer buffer) {
+ lowLevelCache.releaseBuffer(buffer);
+ }
+
+ @Override
+ public void reuseBuffer(MemoryBuffer buffer) {
+ boolean isReused = lowLevelCache.reuseBuffer(buffer);
+ assert isReused;
+ }
+
+ @Override
+ public Allocator getAllocator() {
+ return lowLevelCache.getAllocator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ orcDataReader.close();
+ }
+
+ @Override
+ public DiskRangeList readFileData(DiskRangeList range, long baseOffset,
+ boolean doForceDirect) throws IOException {
+ long startTime = counters.startTimeCounter();
+ DiskRangeList result = orcDataReader.readFileData(range, baseOffset, doForceDirect);
+ counters.recordHdfsTime(startTime);
+ if (DebugUtils.isTraceOrcEnabled() && LOG.isInfoEnabled()) {
+ LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + baseOffset
+ + "): " + RecordReaderUtils.stringifyDiskRanges(result));
+ }
+ return result;
+ }
+
+ @Override
+ public boolean isTrackingDiskRanges() {
+ return orcDataReader.isTrackingDiskRanges();
+ }
+
+ @Override
+ public void releaseBuffer(ByteBuffer buffer) {
+ orcDataReader.releaseBuffer(buffer);
+ }
+
+ @Override
+ public void open() throws IOException {
+ long startTime = counters.startTimeCounter();
+ orcDataReader.open();
+ counters.recordHdfsTime(startTime);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 5770bef,2500fb6..ffeaaa0
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@@ -56,9 -56,9 +56,10 @@@ import org.apache.hadoop.hive.ql.io.Aci
import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+ import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@@ -264,10 -265,9 +266,9 @@@ public class OrcInputFormat implement
boolean[] result = new boolean[numColumns];
result[0] = true;
OrcProto.Type root = types.get(rootColumn);
- for(int i=0; i < root.getSubtypesCount(); ++i) {
+ for (int i = 0; i < root.getSubtypesCount(); ++i) {
if (included.contains(i)) {
- includeColumnRecursive(types, result, root.getSubtypes(i),
- rootColumn);
+ includeColumnRecursive(types, result, root.getSubtypes(i), rootColumn);
}
}
return result;
@@@ -866,33 -901,11 +904,11 @@@
// we can't eliminate stripes if there are deltas because the
// deltas may change the rows making them match the predicate.
- if (deltas.isEmpty()) {
- Reader.Options options = new Reader.Options();
- options.include(includedCols);
- setSearchArgument(options, types, context.conf, isOriginal);
- // only do split pruning if HIVE-8732 has been fixed in the writer
- if (options.getSearchArgument() != null &&
- writerVersion != OrcFile.WriterVersion.ORIGINAL) {
- SearchArgument sarg = options.getSearchArgument();
- List<PredicateLeaf> sargLeaves = sarg.getLeaves();
- int[] filterColumns = RecordReaderImpl.mapSargColumns(sargLeaves,
- options.getColumnNames(), getRootColumn(isOriginal));
-
- if (stripeStats != null) {
- // eliminate stripes that doesn't satisfy the predicate condition
- includeStripe = new boolean[stripes.size()];
- for (int i = 0; i < stripes.size(); ++i) {
- includeStripe[i] = (i >= stripeStats.size()) ||
- isStripeSatisfyPredicate(stripeStats.get(i), sarg,
- filterColumns);
- if (isDebugEnabled && !includeStripe[i]) {
- LOG.debug("Eliminating ORC stripe-" + i + " of file '" +
- fileWithId.getFileStatus().getPath() + "' as it did not satisfy " +
- "predicate condition.");
- }
- }
- }
- }
+ if (deltas.isEmpty() && canCreateSargFromConf(context.conf)) {
+ SearchArgument sarg = ConvertAstToSearchArg.createFromConf(context.conf);
+ String[] sargColNames = extractNeededColNames(types, context.conf, includedCols, isOriginal);
+ includeStripe = pickStripes(sarg, sargColNames, writerVersion, isOriginal,
- metadata.getStripeStatistics(), stripes.size(), file.getPath());
++ stripeStats, stripes.size(), file.getPath());
}
// if we didn't have predicate pushdown, read everything
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_count_distinct.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_decimal_aggregate.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_distinct_2.q.out
----------------------------------------------------------------------
diff --cc ql/src/test/results/clientpositive/spark/vector_distinct_2.q.out
index c974d00,52c00f9..563213a
--- a/ql/src/test/results/clientpositive/spark/vector_distinct_2.q.out
+++ b/ql/src/test/results/clientpositive/spark/vector_distinct_2.q.out
@@@ -143,20 -143,24 +143,32 @@@ STAGE PLANS
Statistics: Num rows: 2000 Data size: 918712 Basic stats: COMPLETE Column stats: NONE
Execution mode: vectorized
Reducer 2
+ Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
- keys: KEY._col0 (type: string), KEY._col1 (type: tinyint)
+ keys: KEY._col0 (type: tinyint), KEY._col1 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: false
+ Select Operator
+ expressions: _col1 (type: string), _col0 (type: tinyint)
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE
++<<<<<<< HEAD
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
++=======
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Execution mode: vectorized
++>>>>>>> master
Stage: Stage-0
Fetch Operator
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_groupby_3.q.out
----------------------------------------------------------------------
diff --cc ql/src/test/results/clientpositive/spark/vector_groupby_3.q.out
index 9687ec1,2255f72..b2402db
--- a/ql/src/test/results/clientpositive/spark/vector_groupby_3.q.out
+++ b/ql/src/test/results/clientpositive/spark/vector_groupby_3.q.out
@@@ -153,13 -152,18 +153,25 @@@ STAGE PLANS
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: false
+ Select Operator
+ expressions: _col1 (type: string), _col0 (type: tinyint), _col2 (type: bigint)
+ outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE
++<<<<<<< HEAD
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
++=======
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Execution mode: vectorized
++>>>>>>> master
Stage: Stage-0
Fetch Operator
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out
----------------------------------------------------------------------
diff --cc ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out
index a3a44df,bbc66fc..6308cee
--- a/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out
+++ b/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out
@@@ -65,22 -65,17 +65,18 @@@ STAGE PLANS
Filter Operator
predicate: l_partkey is not null (type: boolean)
Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: l_partkey (type: int)
+ Group By Operator
+ keys: l_partkey (type: int)
+ mode: hash
outputColumnNames: _col0
Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE
- Group By Operator
- keys: _col0 (type: int)
- mode: hash
- outputColumnNames: _col0
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: int)
- sort order: +
- Map-reduce partition columns: _col0 (type: int)
- Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE
Reducer 4
+ Execution mode: vectorized
Local Work:
Map Reduce Local Work
Reduce Operator Tree:
@@@ -270,22 -266,17 +266,18 @@@ STAGE PLANS
Filter Operator
predicate: l_partkey is not null (type: boolean)
Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: l_partkey (type: int)
+ Group By Operator
+ keys: l_partkey (type: int)
+ mode: hash
outputColumnNames: _col0
Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE
- Group By Operator
- keys: _col0 (type: int)
- mode: hash
- outputColumnNames: _col0
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: int)
- sort order: +
- Map-reduce partition columns: _col0 (type: int)
- Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE
Reducer 4
+ Execution mode: vectorized
Local Work:
Map Reduce Local Work
Reduce Operator Tree:
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_orderby_5.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorization_0.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorization_13.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorization_15.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorization_short_regress.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
----------------------------------------------------------------------
diff --cc ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
index 7308fb2,316ed63..bf23ae4
--- a/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
+++ b/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out
@@@ -97,24 -97,19 +97,27 @@@ STAGE PLANS
input vertices:
1 Map 4
Statistics: Num rows: 7433 Data size: 228226 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: _col1 (type: double)
+ Group By Operator
+ aggregations: sum(_col1)
+ mode: hash
outputColumnNames: _col0
- Statistics: Num rows: 7433 Data size: 228226 Basic stats: COMPLETE Column stats: NONE
- Group By Operator
- aggregations: sum(_col0)
- mode: hash
- outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
++<<<<<<< HEAD
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: double)
+ Execution mode: vectorized
++=======
+ value expressions: _col0 (type: double)
++>>>>>>> master
Local Work:
Map Reduce Local Work
- Execution mode: vectorized
Reducer 3
+ Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0)
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorized_timestamp_funcs.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
----------------------------------------------------------------------
diff --cc ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
index 2e87e2c,63e6ade..8e4a501
--- a/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
+++ b/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
@@@ -1591,37 -1591,37 +1591,37 @@@ STAGE PLANS
Filter Operator
predicate: (t is null or (t = 27)) (type: boolean)
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: si (type: smallint), i (type: int), b (type: bigint), f (type: float), t (type: tinyint)
+ Group By Operator
+ keys: t (type: tinyint), si (type: smallint), i (type: int), b (type: bigint), f (type: float)
+ mode: hash
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
- Group By Operator
- keys: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint)
- mode: hash
- outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ Reduce Output Operator
+ key expressions: _col0 (type: tinyint), _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float)
+ sort order: +++++
+ Map-reduce partition columns: _col0 (type: tinyint), _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float)
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint)
- sort order: +++++
- Map-reduce partition columns: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint)
- Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
Execution mode: vectorized
Reducer 2
+ Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
- keys: KEY._col0 (type: smallint), KEY._col1 (type: int), KEY._col2 (type: bigint), KEY._col3 (type: float), KEY._col4 (type: tinyint)
+ keys: KEY._col0 (type: tinyint), KEY._col1 (type: smallint), KEY._col2 (type: int), KEY._col3 (type: bigint), KEY._col4 (type: float)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: false
+ Select Operator
+ expressions: _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float), _col0 (type: tinyint)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.over1k_part2_orc
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.over1k_part2_orc
- Execution mode: vectorized
Stage: Stage-2
Dependency Collection
@@@ -1670,50 -1669,37 +1669,37 @@@ STAGE PLANS
Filter Operator
predicate: (t is null or (t = 27)) (type: boolean)
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: si (type: smallint), i (type: int), b (type: bigint), f (type: float), t (type: tinyint)
+ Group By Operator
+ keys: t (type: tinyint), si (type: smallint), i (type: int), b (type: bigint), f (type: float)
+ mode: hash
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
- Group By Operator
- keys: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint)
- mode: hash
- outputColumnNames: _col0, _col1, _col2, _col3, _col4
+ Reduce Output Operator
+ key expressions: _col0 (type: tinyint), _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float)
+ sort order: +++++
+ Map-reduce partition columns: _col0 (type: tinyint)
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint)
- sort order: +++++
- Map-reduce partition columns: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint)
- Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
Execution mode: vectorized
Reducer 2
+ Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
- keys: KEY._col0 (type: smallint), KEY._col1 (type: int), KEY._col2 (type: bigint), KEY._col3 (type: float), KEY._col4 (type: tinyint)
+ keys: KEY._col0 (type: tinyint), KEY._col1 (type: smallint), KEY._col2 (type: int), KEY._col3 (type: bigint), KEY._col4 (type: float)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col4 (type: tinyint)
- sort order: +
- Map-reduce partition columns: _col4 (type: tinyint)
- Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint)
- Reducer 3
- Execution mode: vectorized
- Reduce Operator Tree:
- Select Operator
- expressions: VALUE._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), VALUE._col4 (type: tinyint)
- outputColumnNames: _col0, _col1, _col2, _col3, _col4
- Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: false
+ Select Operator
+ expressions: _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float), _col0 (type: tinyint)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.over1k_part2_orc
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.over1k_part2_orc
- Execution mode: vectorized
Stage: Stage-2
Dependency Collection
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/mrr.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/vector_binary_join_groupby.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/vector_count_distinct.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/vector_decimal_aggregate.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/vector_decimal_udf.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/vector_distinct_2.q.out
----------------------------------------------------------------------
diff --cc ql/src/test/results/clientpositive/tez/vector_distinct_2.q.out
index a1063ab,6c31294..44d207b
--- a/ql/src/test/results/clientpositive/tez/vector_distinct_2.q.out
+++ b/ql/src/test/results/clientpositive/tez/vector_distinct_2.q.out
@@@ -143,20 -143,24 +143,24 @@@ STAGE PLANS
Statistics: Num rows: 2000 Data size: 918712 Basic stats: COMPLETE Column stats: NONE
Execution mode: vectorized
Reducer 2
+ Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
- keys: KEY._col0 (type: string), KEY._col1 (type: tinyint)
+ keys: KEY._col0 (type: tinyint), KEY._col1 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: false
+ Select Operator
+ expressions: _col1 (type: string), _col0 (type: tinyint)
+ outputColumnNames: _col0, _col1
Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Execution mode: vectorized
Stage: Stage-0
Fetch Operator