You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/03/28 06:41:33 UTC

svn commit: r1582607 [1/3] - in /hive/branches/branch-0.13: itests/qtest/ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/ ql/src/test/results/clientpositive/tez/

Author: gunther
Date: Fri Mar 28 05:41:33 2014
New Revision: 1582607

URL: http://svn.apache.org/r1582607
Log:
HIVE-6735: Make scalable dynamic partitioning work in vectorized mode (Prasanth J via Gunther Hagleitner)

Added:
    hive/branches/branch-0.13/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q
    hive/branches/branch-0.13/ql/src/test/results/clientpositive/dynpart_sort_opt_vectorization.q.out
    hive/branches/branch-0.13/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
Modified:
    hive/branches/branch-0.13/itests/qtest/pom.xml
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java

Modified: hive/branches/branch-0.13/itests/qtest/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/itests/qtest/pom.xml?rev=1582607&r1=1582606&r2=1582607&view=diff
==============================================================================
--- hive/branches/branch-0.13/itests/qtest/pom.xml (original)
+++ hive/branches/branch-0.13/itests/qtest/pom.xml Fri Mar 28 05:41:33 2014
@@ -39,7 +39,7 @@
     <minimr.query.files>stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q</minimr.query.files>
     <minimr.query.negative.files>cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q</minimr.query.negative.files>
     <minitez.query.files>mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q</minitez.query.files>
-    <minitez.query.files.shared>dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q</minitez.query.files.shared>
+    <minitez.query.files.shared>dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q</minitez.query.files.shared>
     <beeline.positive.exclude>add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rena
 me.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_o
 verwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q</beeline.positive.exclude>
   </properties>
 

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1582607&r1=1582606&r2=1582607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Fri Mar 28 05:41:33 2014
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -35,15 +34,14 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 // import org.apache.hadoop.util.StringUtils;
 
 public class VectorReduceSinkOperator extends ReduceSinkOperator {
@@ -85,10 +83,17 @@ public class VectorReduceSinkOperator ex
   private VectorExpression[] partitionEval;
 
   /**
+  * Evaluators for bucketing columns. This is used to compute bucket number.
+  */
+  private VectorExpression[] bucketEval;
+  private int buckColIdxInKey;
+
+  /**
    * The partition value writers. These know how to write the necessary writable type
    * based on partition column metadata, from the primitive vector type.
    */
   private transient VectorExpressionWriter[] partitionWriters;
+  private transient VectorExpressionWriter[] bucketWriters = null;
 
   public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
       throws HiveException {
@@ -98,6 +103,11 @@ public class VectorReduceSinkOperator ex
     keyEval = vContext.getVectorExpressions(desc.getKeyCols());
     valueEval = vContext.getVectorExpressions(desc.getValueCols());
     partitionEval = vContext.getVectorExpressions(desc.getPartitionCols());
+    bucketEval = null;
+    if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) {
+      bucketEval = vContext.getVectorExpressions(desc.getBucketCols());
+      buckColIdxInKey = desc.getPartitionCols().size();
+    }
   }
 
   public VectorReduceSinkOperator() {
@@ -143,6 +153,9 @@ public class VectorReduceSinkOperator ex
           colNames));
 
       partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols());
+      if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) {
+        bucketWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getBucketCols());
+      }
 
       TableDesc valueTableDesc = conf.getValueSerializeInfo();
       valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
@@ -219,6 +232,11 @@ public class VectorReduceSinkOperator ex
       for (int i = 0; i < partitionEval.length; i++) {
         partitionEval[i].evaluate(vrg);
       }
+      if (bucketEval != null) {
+        for (int i = 0; i < bucketEval.length; i++) {
+          bucketEval[i].evaluate(vrg);
+        }
+      }
       // run the vector evaluations
       for (int i = 0; i < valueEval.length; i++) {
          valueEval[i].evaluate(vrg);
@@ -235,6 +253,13 @@ public class VectorReduceSinkOperator ex
         }
         // First, make distrib key components for this row and determine distKeyLength.
         populatedCachedDistributionKeys(vrg, rowIndex, 0);
+
+        // replace bucketing columns with hashcode % numBuckets
+        int buckNum = 0;
+        if (bucketEval != null && bucketEval.length != 0) {
+          buckNum = computeBucketNumber(vrg, rowIndex, conf.getNumBuckets());
+          cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
+        }
         HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
         int distKeyLength = firstKey.getDistKeyLength();
         // Add first distinct expression, if any.
@@ -247,7 +272,12 @@ public class VectorReduceSinkOperator ex
           reducerHash.tryStoreVectorizedKey(firstKey, batchIndex);
         } else {
         // No TopN, just forward the first key and all others.
-          int hashCode = computeHashCode(vrg, rowIndex);
+          int hashCode = 0;
+          if (bucketEval != null && bucketEval.length != 0) {
+            hashCode = computeHashCode(vrg, rowIndex, buckNum);
+          } else {
+            hashCode = computeHashCode(vrg, rowIndex);
+          }
           firstKey.setHashCode(hashCode);
           BytesWritable value = makeValueWritable(vrg, rowIndex);
           collect(firstKey, value);
@@ -397,6 +427,31 @@ public class VectorReduceSinkOperator ex
     return keyHashCode;
   }
 
+  private int computeHashCode(VectorizedRowBatch vrg, int rowIndex, int buckNum) throws HiveException {
+    int keyHashCode = computeHashCode(vrg, rowIndex);
+    keyHashCode = keyHashCode * 31 + buckNum;
+    return keyHashCode;
+  }
+
+  private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int numBuckets) throws HiveException {
+    int bucketNum = 0;
+    for (int p = 0; p < bucketEval.length; p++) {
+      ColumnVector columnVector = vrg.cols[bucketEval[p].getOutputColumn()];
+      Object bucketValue = bucketWriters[p].writeValue(columnVector, rowIndex);
+      bucketNum = bucketNum
+          * 31
+          + ObjectInspectorUtils.hashCode(
+              bucketValue,
+              bucketWriters[p].getObjectInspector());
+    }
+
+    if (bucketNum < 0) {
+      bucketNum = -1 * bucketNum;
+    }
+
+    return bucketNum % numBuckets;
+  }
+
   static public String getOperatorName() {
     return "RS";
   }

Added: hive/branches/branch-0.13/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q?rev=1582607&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q (added)
+++ hive/branches/branch-0.13/ql/src/test/queries/clientpositive/dynpart_sort_opt_vectorization.q Fri Mar 28 05:41:33 2014
@@ -0,0 +1,95 @@
+set hive.optimize.sort.dynamic.partition=true;
+set hive.exec.dynamic.partition=true;
+set hive.exec.max.dynamic.partitions=1000;
+set hive.exec.max.dynamic.partitions.pernode=1000;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.vectorized.execution.enabled=true;
+set hive.enforce.bucketing=false;
+set hive.enforce.sorting=false;
+
+create table over1k(
+           t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           ts timestamp,
+           dec decimal(4,2),
+           bin binary)
+       row format delimited
+       fields terminated by '|';
+
+load data local inpath '../../data/files/over1k' into table over1k;
+
+create table over1k_orc like over1k;
+alter table over1k_orc set fileformat orc;
+insert overwrite table over1k_orc select * from over1k;
+
+create table over1k_part_orc(
+           si smallint,
+           i int,
+           b bigint,
+           f float)
+       partitioned by (ds string, t tinyint) stored as orc;
+
+create table over1k_part_limit_orc like over1k_part_orc;
+alter table over1k_part_limit_orc set fileformat orc;
+
+create table over1k_part_buck_orc(
+           si smallint,
+           i int,
+           b bigint,
+           f float)
+       partitioned by (t tinyint)
+       clustered by (si) into 4 buckets stored as orc;
+
+create table over1k_part_buck_sort_orc(
+           si smallint,
+           i int,
+           b bigint,
+           f float)
+       partitioned by (t tinyint)
+       clustered by (si) 
+       sorted by (f) into 4 buckets stored as orc;
+
+-- map-only jobs converted to map-reduce job by hive.optimize.sort.dynamic.partition optimization
+explain insert overwrite table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+explain insert overwrite table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
+explain insert overwrite table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+explain insert overwrite table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+
+insert overwrite table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+insert overwrite table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
+insert overwrite table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+insert overwrite table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+
+-- map-reduce jobs modified by hive.optimize.sort.dynamic.partition optimization
+explain insert into table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+explain insert into table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
+explain insert into table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+explain insert into table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+
+insert into table over1k_part_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+insert into table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10;
+insert into table over1k_part_buck_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+insert into table over1k_part_buck_sort_orc partition(t) select si,i,b,f,t from over1k_orc where t is null or t=27;
+
+desc formatted over1k_part_orc partition(ds="foo",t=27);
+desc formatted over1k_part_orc partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+desc formatted over1k_part_limit_orc partition(ds="foo",t=27);
+desc formatted over1k_part_limit_orc partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+desc formatted over1k_part_buck_orc partition(t=27);
+desc formatted over1k_part_buck_orc partition(t="__HIVE_DEFAULT_PARTITION__");
+desc formatted over1k_part_buck_sort_orc partition(t=27);
+desc formatted over1k_part_buck_sort_orc partition(t="__HIVE_DEFAULT_PARTITION__");
+
+select count(*) from over1k_part_orc;
+select count(*) from over1k_part_limit_orc;
+select count(*) from over1k_part_buck_orc;
+select count(*) from over1k_part_buck_sort_orc;