You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/03/17 22:19:58 UTC
svn commit: r1578590 [1/3] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/ itests/qtest/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/io/
ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/jav...
Author: sershe
Date: Mon Mar 17 21:19:57 2014
New Revision: 1578590
URL: http://svn.apache.org/r1578590
Log:
HIVE-6578 : Use ORC file footer statistics through StatsProvidingRecordReader interface for analyze command (Prasanth J, reviewed by Sergey Shelukhin)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/StatsProvidingRecordReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsNoJobWork.java
hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q
hive/trunk/ql/src/test/results/clientpositive/orc_analyze.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_analyze.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/itests/qtest/pom.xml
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1578590&r1=1578589&r2=1578590&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Mar 17 21:19:57 2014
@@ -527,7 +527,6 @@ public class HiveConf extends Configurat
true),
// Define the default compression codec for ORC file
HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"),
-
HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false),
HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000),
HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10),
@@ -653,6 +652,10 @@ public class HiveConf extends Configurat
CLIENT_STATS_COUNTERS("hive.client.stats.counters", ""),
//Subset of counters that should be of interest for hive.client.stats.publishers (when one wants to limit their publishing). Non-display names should be used".
HIVE_STATS_RELIABLE("hive.stats.reliable", false),
+ // number of threads used by partialscan/noscan stats gathering for partitioned tables.
+ // This is applicable only for file formats that implement StatsProvidingRecordReader
+ // interface (like ORC)
+ HIVE_STATS_GATHER_NUM_THREADS("hive.stats.gather.num.threads", 10),
// Collect table access keys information for operators that can benefit from bucketing
HIVE_STATS_COLLECT_TABLEKEYS("hive.stats.collect.tablekeys", false),
// Collect column access information
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1578590&r1=1578589&r2=1578590&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Mon Mar 17 21:19:57 2014
@@ -2368,6 +2368,15 @@
</property>
<property>
+ <name>hive.stats.gather.num.threads</name>
+ <value>10</value>
+ <description>
+ Number of threads used by partialscan/noscan analyze command for partitioned tables.
+ This is applicable only for file formats that implement StatsProvidingRecordReader (like ORC).
+ </description>
+</property>
+
+<property>
<name>hive.exec.orc.zerocopy</name>.
<value>false</value>
<description>
Modified: hive/trunk/itests/qtest/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/qtest/pom.xml?rev=1578590&r1=1578589&r2=1578590&view=diff
==============================================================================
--- hive/trunk/itests/qtest/pom.xml (original)
+++ hive/trunk/itests/qtest/pom.xml Mon Mar 17 21:19:57 2014
@@ -39,7 +39,7 @@
<minimr.query.files>stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q</minimr.query.files>
<minimr.query.negative.files>cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q</minimr.query.negative.files>
<minitez.query.files>tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q</minitez.query.files>
- <minitez.query.files.shared>join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q</minitez.query.files.shared>
+ <minitez.query.files.shared>orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q</minitez.query.files.shared>
<beeline.positive.exclude>add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rena
me.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_o
verwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q</beeline.positive.exclude>
</properties>
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java?rev=1578590&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java Mon Mar 17 21:19:57 2014
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
+import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * StatsNoJobTask is used in cases where stats collection is the only task for the given query (no
+ * parent MR or Tez job). It is used in the following cases 1) ANALYZE with partialscan/noscan for
+ * file formats that implement StatsProvidingRecordReader interface: ORC format (implements
+ * StatsProvidingRecordReader) stores column statistics for all columns in the file footer. Its much
+ * faster to compute the table/partition statistics by reading the footer than scanning all the
+ * rows. This task can be used for computing basic stats like numFiles, numRows, fileSize,
+ * rawDataSize from ORC footer.
+ **/
+public class StatsNoJobTask extends Task<StatsNoJobWork> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static transient final Log LOG = LogFactory.getLog(StatsNoJobTask.class);
+ private static ConcurrentMap<String, Partition> partUpdates;
+ private static Table table;
+ private static String tableFullName;
+ private static JobConf jc = null;
+
+ public StatsNoJobTask() {
+ super();
+ }
+
+ @Override
+ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
+ super.initialize(conf, queryPlan, driverContext);
+ jc = new JobConf(conf);
+ }
+
+ @Override
+ public int execute(DriverContext driverContext) {
+
+ LOG.info("Executing stats (no job) task");
+
+ String tableName = "";
+ ExecutorService threadPool = null;
+ try {
+ tableName = work.getTableSpecs().tableName;
+ table = db.getTable(tableName);
+ int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_STATS_GATHER_NUM_THREADS);
+ tableFullName = table.getDbName() + "." + table.getTableName();
+ threadPool = Executors.newFixedThreadPool(numThreads,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("StatsNoJobTask-Thread-%d")
+ .build());
+ partUpdates = new MapMaker().concurrencyLevel(numThreads).makeMap();
+ LOG.info("Initialized threadpool for stats computation with " + numThreads + " threads");
+ } catch (HiveException e) {
+ LOG.error("Cannot get table " + tableName, e);
+ console.printError("Cannot get table " + tableName, e.toString());
+ }
+
+ return aggregateStats(threadPool);
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.STATS;
+ }
+
+ @Override
+ public String getName() {
+ return "STATS-NO-JOB";
+ }
+
+ class StatsCollection implements Runnable {
+
+ private Partition partn;
+
+ public StatsCollection(Partition part) {
+ this.partn = part;
+ }
+
+ @Override
+ public void run() {
+
+ // get the list of partitions
+ org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
+ Map<String, String> parameters = tPart.getParameters();
+
+ try {
+ Path dir = new Path(tPart.getSd().getLocation());
+ long numRows = 0;
+ long rawDataSize = 0;
+ long fileSize = 0;
+ long numFiles = 0;
+ FileSystem fs = dir.getFileSystem(conf);
+ Iterator<FileStatus> itr = ShimLoader.getHadoopShims().listLocatedStatus(fs, dir,
+ hiddenFileFilter);
+ boolean statsAvailable = false;
+ while (itr.hasNext()) {
+ FileStatus file = itr.next();
+ if (!file.isDir()) {
+ InputFormat<?, ?> inputFormat = (InputFormat<?, ?>) ReflectionUtils.newInstance(
+ partn.getInputFormatClass(), jc);
+ InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0,
+ new String[] { partn.getLocation() });
+ Object recordReader = inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
+ StatsProvidingRecordReader statsRR;
+ if (recordReader instanceof StatsProvidingRecordReader) {
+ statsRR = (StatsProvidingRecordReader) recordReader;
+ rawDataSize += statsRR.getStats().getRawDataSize();
+ numRows += statsRR.getStats().getRowCount();
+ fileSize += file.getLen();
+ numFiles += 1;
+ statsAvailable = true;
+ }
+ }
+ }
+
+ if (statsAvailable) {
+ parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
+ parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(rawDataSize));
+ parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(fileSize));
+ parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(numFiles));
+ parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE);
+
+ partUpdates.put(tPart.getSd().getLocation(), new Partition(table, tPart));
+
+ // printout console and debug logs
+ String threadName = Thread.currentThread().getName();
+ String msg = "Partition " + tableFullName + partn.getSpec() + " stats: ["
+ + toString(parameters) + ']';
+ LOG.debug(threadName + ": " + msg);
+ console.printInfo(msg);
+ } else {
+ String threadName = Thread.currentThread().getName();
+ String msg = "Partition " + tableFullName + partn.getSpec() + " does not provide stats.";
+ LOG.debug(threadName + ": " + msg);
+ }
+ } catch (Exception e) {
+ console.printInfo("[Warning] could not update stats for " + tableFullName + partn.getSpec()
+ + ".",
+ "Failed with exception " + e.getMessage() + "\n" + StringUtils.stringifyException(e));
+
+ // Before updating the partition params, if any partition params is null
+ // and if statsReliable is true then updatePartition() function will fail
+ // the task by returning 1
+ if (work.isStatsReliable()) {
+ partUpdates.put(tPart.getSd().getLocation(), null);
+ }
+ }
+ }
+
+ private String toString(Map<String, String> parameters) {
+ StringBuilder builder = new StringBuilder();
+ for (String statType : StatsSetupConst.supportedStats) {
+ String value = parameters.get(statType);
+ if (value != null) {
+ if (builder.length() > 0) {
+ builder.append(", ");
+ }
+ builder.append(statType).append('=').append(value);
+ }
+ }
+ return builder.toString();
+ }
+
+ }
+
+ private int aggregateStats(ExecutorService threadPool) {
+ int ret = 0;
+
+ try {
+ List<Partition> partitions = getPartitionsList();
+
+ // non-partitioned table
+ if (partitions == null) {
+ org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
+ Map<String, String> parameters = tTable.getParameters();
+ try {
+ Path dir = new Path(tTable.getSd().getLocation());
+ long numRows = 0;
+ long rawDataSize = 0;
+ long fileSize = 0;
+ long numFiles = 0;
+ FileSystem fs = dir.getFileSystem(conf);
+ Iterator<FileStatus> itr = ShimLoader.getHadoopShims().listLocatedStatus(fs, dir,
+ hiddenFileFilter);
+ boolean statsAvailable = false;
+ while (itr.hasNext()) {
+ FileStatus file = itr.next();
+ if (!file.isDir()) {
+ InputFormat<?, ?> inputFormat = (InputFormat<?, ?>) ReflectionUtils.newInstance(
+ table.getInputFormatClass(), jc);
+ InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0, new String[] { table
+ .getDataLocation().toString() });
+ org.apache.hadoop.mapred.RecordReader<?, ?> recordReader = (org.apache.hadoop.mapred.RecordReader<?, ?>) inputFormat
+ .getRecordReader(dummySplit, jc, Reporter.NULL);
+ StatsProvidingRecordReader statsRR;
+ if (recordReader instanceof StatsProvidingRecordReader) {
+ statsRR = (StatsProvidingRecordReader) recordReader;
+ numRows += statsRR.getStats().getRowCount();
+ rawDataSize += statsRR.getStats().getRawDataSize();
+ fileSize += file.getLen();
+ numFiles += 1;
+ statsAvailable = true;
+ }
+ }
+ }
+
+ if (statsAvailable) {
+ parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
+ parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(rawDataSize));
+ parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(fileSize));
+ parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(numFiles));
+ parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE);
+
+ db.alterTable(tableFullName, new Table(tTable));
+
+ String msg = "Table " + tableFullName + " stats: [" + toString(parameters) + ']';
+ LOG.debug(msg);
+ console.printInfo(msg);
+ } else {
+ String msg = "Table " + tableFullName + " does not provide stats.";
+ LOG.debug(msg);
+ }
+ } catch (Exception e) {
+ console.printInfo("[Warning] could not update stats for " + tableFullName + ".",
+ "Failed with exception " + e.getMessage() + "\n" + StringUtils.stringifyException(e));
+ }
+ } else {
+
+ // Partitioned table
+ for (Partition partn : partitions) {
+ threadPool.execute(new StatsCollection(partn));
+ }
+
+ LOG.debug("Stats collection waiting for threadpool to shutdown..");
+ shutdownAndAwaitTermination(threadPool);
+ LOG.debug("Stats collection threadpool shutdown successful.");
+
+ ret = updatePartitions();
+ }
+
+ } catch (Exception e) {
+ // Fail the query if the stats are supposed to be reliable
+ if (work.isStatsReliable()) {
+ ret = -1;
+ }
+ }
+
+ // The return value of 0 indicates success,
+ // anything else indicates failure
+ return ret;
+ }
+
+ private int updatePartitions() throws InvalidOperationException, HiveException {
+ if (!partUpdates.isEmpty()) {
+ List<Partition> updatedParts = Lists.newArrayList(partUpdates.values());
+ if (updatedParts.contains(null) && work.isStatsReliable()) {
+ LOG.debug("Stats requested to be reliable. Empty stats found and hence failing the task.");
+ return -1;
+ } else {
+ LOG.debug("Bulk updating partitions..");
+ db.alterPartitions(tableFullName, Lists.newArrayList(partUpdates.values()));
+ LOG.debug("Bulk updated " + partUpdates.values().size() + " partitions.");
+ }
+ }
+ return 0;
+ }
+
+ private void shutdownAndAwaitTermination(ExecutorService threadPool) {
+
+ // Disable new tasks from being submitted
+ threadPool.shutdown();
+ try {
+
+ // Wait a while for existing tasks to terminate
+ if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) {
+ // Cancel currently executing tasks
+ threadPool.shutdownNow();
+
+ // Wait a while for tasks to respond to being cancelled
+ if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) {
+ LOG.debug("Stats collection thread pool did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+
+ // Cancel again if current thread also interrupted
+ threadPool.shutdownNow();
+
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private static final PathFilter hiddenFileFilter = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ private String toString(Map<String, String> parameters) {
+ StringBuilder builder = new StringBuilder();
+ for (String statType : StatsSetupConst.supportedStats) {
+ String value = parameters.get(statType);
+ if (value != null) {
+ if (builder.length() > 0) {
+ builder.append(", ");
+ }
+ builder.append(statType).append('=').append(value);
+ }
+ }
+ return builder.toString();
+ }
+
+ private List<Partition> getPartitionsList() throws HiveException {
+ if (work.getTableSpecs() != null) {
+ tableSpec tblSpec = work.getTableSpecs();
+ table = tblSpec.tableHandle;
+ if (!table.isPartitioned()) {
+ return null;
+ } else {
+ return tblSpec.partitions;
+ }
+ }
+ return null;
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1578590&r1=1578589&r2=1578590&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Mon Mar 17 21:19:57 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.plan.Fu
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
@@ -86,6 +87,7 @@ public final class TaskFactory {
MapredLocalTask.class));
taskvec.add(new TaskTuple<StatsWork>(StatsWork.class,
StatsTask.class));
+ taskvec.add(new TaskTuple<StatsNoJobWork>(StatsNoJobWork.class, StatsNoJobTask.class));
taskvec.add(new TaskTuple<ColumnStatsWork>(ColumnStatsWork.class, ColumnStatsTask.class));
taskvec.add(new TaskTuple<MergeWork>(MergeWork.class,
BlockMergeTask.class));
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/StatsProvidingRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/StatsProvidingRecordReader.java?rev=1578590&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/StatsProvidingRecordReader.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/StatsProvidingRecordReader.java Mon Mar 17 21:19:57 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.hive.serde2.SerDeStats;
+
+/**
+ * If a file format internally gathers statistics (like ORC) then it can expose
+ * the statistics through this interface. Reader side statistics are useful for
+ * updating the metastore with table/partition level statistics using analyze
+ * command.
+ * StatsProvidingRecordReader.
+ */
+public interface StatsProvidingRecordReader {
+
+ /**
+ * Returns the statistics information
+ * @return SerDeStats
+ */
+ SerDeStats getStats();
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1578590&r1=1578589&r2=1578590&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Mon Mar 17 21:19:57 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
import org.apache.hadoop.hive.ql.io.orc.Metadata;
import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hive.ql.io.sarg
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.LongWritable;
@@ -99,21 +101,26 @@ public class OrcInputFormat implements
private static final double MIN_INCLUDED_LOCATION = 0.80;
private static class OrcRecordReader
- implements org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> {
+ implements org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>,
+ StatsProvidingRecordReader {
private final RecordReader reader;
private final long offset;
private final long length;
private final int numColumns;
private float progress = 0.0f;
+ private final Reader file;
+ private final SerDeStats stats;
OrcRecordReader(Reader file, Configuration conf,
long offset, long length) throws IOException {
List<OrcProto.Type> types = file.getTypes();
+ this.file = file;
numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
this.reader = createReaderFromFile(file, conf, offset, length);
this.offset = offset;
this.length = length;
+ this.stats = new SerDeStats();
}
@Override
@@ -151,6 +158,13 @@ public class OrcInputFormat implements
public float getProgress() throws IOException {
return progress;
}
+
+ @Override
+ public SerDeStats getStats() {
+ stats.setRawDataSize(file.getRawDataSize());
+ stats.setRowCount(file.getNumberOfRows());
+ return stats;
+ }
}
static RecordReader createReaderFromFile(
@@ -867,33 +881,6 @@ public class OrcInputFormat implements
return true;
}
- private Object getMax(ColumnStatistics index) {
- if (index instanceof IntegerColumnStatistics) {
- return ((IntegerColumnStatistics) index).getMaximum();
- } else if (index instanceof DoubleColumnStatistics) {
- return ((DoubleColumnStatistics) index).getMaximum();
- } else if (index instanceof StringColumnStatistics) {
- return ((StringColumnStatistics) index).getMaximum();
- } else if (index instanceof DateColumnStatistics) {
- return ((DateColumnStatistics) index).getMaximum();
- } else {
- return null;
- }
- }
-
- private Object getMin(ColumnStatistics index) {
- if (index instanceof IntegerColumnStatistics) {
- return ((IntegerColumnStatistics) index).getMinimum();
- } else if (index instanceof DoubleColumnStatistics) {
- return ((DoubleColumnStatistics) index).getMinimum();
- } else if (index instanceof StringColumnStatistics) {
- return ((StringColumnStatistics) index).getMinimum();
- } else if (index instanceof DateColumnStatistics) {
- return ((DateColumnStatistics) index).getMinimum();
- } else {
- return null;
- }
- }
}
static List<Context.FileSplitInfo> generateSplitsInfo(Configuration conf)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1578590&r1=1578589&r2=1578590&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Mon Mar 17 21:19:57 2014
@@ -531,6 +531,8 @@ final class ReaderImpl implements Reader
// statistics is not required as protocol buffers takes care of it.
return colStat.getBinaryStatistics().getSum();
case STRING:
+ case CHAR:
+ case VARCHAR:
// old orc format doesn't support sum for string statistics. checking for
// existence is not required as protocol buffers takes care of it.
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=1578590&r1=1578589&r2=1578590&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Mon Mar 17 21:19:57 2014
@@ -20,24 +20,20 @@ package org.apache.hadoop.hive.ql.optimi
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.lang.StringBuffer;
import java.util.Stack;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -45,14 +41,16 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.QBParseInfo;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.mapred.InputFormat;
+
/**
* Processor for the rule - table scan.
*/
@@ -62,7 +60,6 @@ public class GenMRTableScan1 implements
/**
* Table Sink encountered.
- *
* @param nd
* the table sink operator encountered
* @param opProcCtx
@@ -73,6 +70,8 @@ public class GenMRTableScan1 implements
TableScanOperator op = (TableScanOperator) nd;
GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
ParseContext parseCtx = ctx.getParseCtx();
+ Class<? extends InputFormat> inputFormat = parseCtx.getTopToTable().get(op)
+ .getInputFormatClass();
Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
// create a dummy MapReduce task
@@ -91,52 +90,71 @@ public class GenMRTableScan1 implements
QBParseInfo parseInfo = parseCtx.getQB().getParseInfo();
if (parseInfo.isAnalyzeCommand()) {
-
- // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS;
- // The plan consists of a simple MapRedTask followed by a StatsTask.
- // The MR task is just a simple TableScanOperator
-
- StatsWork statsWork = new StatsWork(parseCtx.getQB().getParseInfo().getTableSpec());
- statsWork.setAggKey(op.getConf().getStatsAggPrefix());
- statsWork.setSourceTask(currTask);
- statsWork.setStatsReliable(
- parseCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
- Task<StatsWork> statsTask = TaskFactory.get(statsWork, parseCtx.getConf());
- currTask.addDependentTask(statsTask);
- if (!ctx.getRootTasks().contains(currTask)) {
- ctx.getRootTasks().add(currTask);
- }
-
- // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
- // The plan consists of a StatsTask only.
- if (parseInfo.isNoScanAnalyzeCommand()) {
- statsTask.setParentTasks(null);
- statsWork.setNoScanAnalyzeCommand(true);
- ctx.getRootTasks().remove(currTask);
- ctx.getRootTasks().add(statsTask);
- }
-
- // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan;
- if (parseInfo.isPartialScanAnalyzeCommand()) {
- handlePartialScanCommand(op, ctx, parseCtx, currTask, parseInfo, statsWork, statsTask);
- }
-
- currWork.getMapWork().setGatheringStats(true);
- if (currWork.getReduceWork() != null) {
- currWork.getReduceWork().setGatheringStats(true);
- }
-
- // NOTE: here we should use the new partition predicate pushdown API to get a list of pruned list,
- // and pass it to setTaskPlan as the last parameter
- Set<Partition> confirmedPartns = GenMapRedUtils.getConfirmedPartitionsForScan(parseInfo);
- if (confirmedPartns.size() > 0) {
- Table source = parseCtx.getQB().getMetaData().getTableForAlias(alias);
- PrunedPartitionList partList = new PrunedPartitionList(source, confirmedPartns, false);
- GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx, partList);
- } else { // non-partitioned table
- GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx);
+ boolean partialScan = parseInfo.isPartialScanAnalyzeCommand();
+ boolean noScan = parseInfo.isNoScanAnalyzeCommand();
+ if (inputFormat.equals(OrcInputFormat.class) && (noScan || partialScan)) {
+
+ // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan;
+ // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
+ // There will not be any MR or Tez job above this task
+ StatsNoJobWork snjWork = new StatsNoJobWork(parseCtx.getQB().getParseInfo().getTableSpec());
+ snjWork.setStatsReliable(parseCtx.getConf().getBoolVar(
+ HiveConf.ConfVars.HIVE_STATS_RELIABLE));
+ Task<StatsNoJobWork> snjTask = TaskFactory.get(snjWork, parseCtx.getConf());
+ ctx.setCurrTask(snjTask);
+ ctx.setCurrTopOp(null);
+ ctx.getRootTasks().clear();
+ ctx.getRootTasks().add(snjTask);
+ } else {
+ // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS;
+ // The plan consists of a simple MapRedTask followed by a StatsTask.
+ // The MR task is just a simple TableScanOperator
+
+ StatsWork statsWork = new StatsWork(parseCtx.getQB().getParseInfo().getTableSpec());
+ statsWork.setAggKey(op.getConf().getStatsAggPrefix());
+ statsWork.setSourceTask(currTask);
+ statsWork.setStatsReliable(parseCtx.getConf().getBoolVar(
+ HiveConf.ConfVars.HIVE_STATS_RELIABLE));
+ Task<StatsWork> statsTask = TaskFactory.get(statsWork, parseCtx.getConf());
+ currTask.addDependentTask(statsTask);
+ if (!ctx.getRootTasks().contains(currTask)) {
+ ctx.getRootTasks().add(currTask);
+ }
+
+ // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
+ // The plan consists of a StatsTask only.
+ if (parseInfo.isNoScanAnalyzeCommand()) {
+ statsTask.setParentTasks(null);
+ statsWork.setNoScanAnalyzeCommand(true);
+ ctx.getRootTasks().remove(currTask);
+ ctx.getRootTasks().add(statsTask);
+ }
+
+ // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan;
+ if (parseInfo.isPartialScanAnalyzeCommand()) {
+ handlePartialScanCommand(op, ctx, parseCtx, currTask, parseInfo, statsWork, statsTask);
+ }
+
+ currWork.getMapWork().setGatheringStats(true);
+ if (currWork.getReduceWork() != null) {
+ currWork.getReduceWork().setGatheringStats(true);
+ }
+
+ // NOTE: here we should use the new partition predicate pushdown API to get a list of
+ // pruned list,
+ // and pass it to setTaskPlan as the last parameter
+ Set<Partition> confirmedPartns = GenMapRedUtils
+ .getConfirmedPartitionsForScan(parseInfo);
+ if (confirmedPartns.size() > 0) {
+ Table source = parseCtx.getQB().getMetaData().getTableForAlias(alias);
+ PrunedPartitionList partList = new PrunedPartitionList(source, confirmedPartns, false);
+ GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx, partList);
+ } else { // non-partitioned table
+ GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, currTask, false, ctx);
+ }
}
}
+
return true;
}
}
@@ -145,9 +163,7 @@ public class GenMRTableScan1 implements
}
/**
- * handle partial scan command.
- *
- * It is composed of PartialScanTask followed by StatsTask .
+ * handle partial scan command. It is composed of PartialScanTask followed by StatsTask .
* @param op
* @param ctx
* @param parseCtx
@@ -158,12 +174,12 @@ public class GenMRTableScan1 implements
* @throws SemanticException
*/
private void handlePartialScanCommand(TableScanOperator op, GenMRProcContext ctx,
- ParseContext parseCtx,
- Task<? extends Serializable> currTask, QBParseInfo parseInfo, StatsWork statsWork,
- Task<StatsWork> statsTask) throws SemanticException {
+ ParseContext parseCtx, Task<? extends Serializable> currTask, QBParseInfo parseInfo,
+ StatsWork statsWork, Task<StatsWork> statsTask) throws SemanticException {
String aggregationKey = op.getConf().getStatsAggPrefix();
StringBuffer aggregationKeyBuffer = new StringBuffer(aggregationKey);
- List<Path> inputPaths = GenMapRedUtils.getInputPathsForPartialScan(parseInfo, aggregationKeyBuffer);
+ List<Path> inputPaths = GenMapRedUtils.getInputPathsForPartialScan(parseInfo,
+ aggregationKeyBuffer);
aggregationKey = aggregationKeyBuffer.toString();
// scan work
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java?rev=1578590&r1=1578589&r2=1578590&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java Mon Mar 17 21:19:57 2014
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.ErrorMs
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -46,8 +47,10 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.QBParseInfo;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.mapred.InputFormat;
/**
* ProcessAnalyzeTable sets up work for the several variants of analyze table
@@ -79,6 +82,8 @@ public class ProcessAnalyzeTable impleme
TableScanOperator tableScan = (TableScanOperator) nd;
ParseContext parseContext = context.parseContext;
+ Class<? extends InputFormat> inputFormat = parseContext.getTopToTable().get(tableScan)
+ .getInputFormatClass();
QB queryBlock = parseContext.getQB();
QBParseInfo parseInfo = parseContext.getQB().getParseInfo();
@@ -97,6 +102,22 @@ public class ProcessAnalyzeTable impleme
assert alias != null;
TezWork tezWork = context.currentTask.getWork();
+ boolean partialScan = parseInfo.isPartialScanAnalyzeCommand();
+ boolean noScan = parseInfo.isNoScanAnalyzeCommand();
+ if (inputFormat.equals(OrcInputFormat.class) && (noScan || partialScan)) {
+
+ // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS partialscan;
+ // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
+ // There will not be any Tez job above this task
+ StatsNoJobWork snjWork = new StatsNoJobWork(parseContext.getQB().getParseInfo().getTableSpec());
+ snjWork.setStatsReliable(parseContext.getConf().getBoolVar(
+ HiveConf.ConfVars.HIVE_STATS_RELIABLE));
+ Task<StatsNoJobWork> snjTask = TaskFactory.get(snjWork, parseContext.getConf());
+ snjTask.setParentTasks(null);
+ context.rootTasks.remove(context.currentTask);
+ context.rootTasks.add(snjTask);
+ return true;
+ } else {
// ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS;
// The plan consists of a simple TezTask followed by a StatsTask.
@@ -136,6 +157,7 @@ public class ProcessAnalyzeTable impleme
w.setGatheringStats(true);
return true;
+ }
}
return null;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1578590&r1=1578589&r2=1578590&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Mar 17 21:19:57 2014
@@ -86,6 +86,7 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.NullRowsInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
@@ -1311,6 +1312,7 @@ public class SemanticAnalyzer extends Ba
Class<? extends InputFormat> inputFormatClass = null;
switch (ts.specType) {
case TABLE_ONLY:
+ case DYNAMIC_PARTITION:
inputFormatClass = ts.tableHandle.getInputFormatClass();
break;
case STATIC_PARTITION:
@@ -1319,8 +1321,9 @@ public class SemanticAnalyzer extends Ba
default:
assert false;
}
- // throw a HiveException for non-rcfile.
- if (!inputFormatClass.equals(RCFileInputFormat.class)) {
+ // throw a HiveException for formats other than rcfile or orcfile.
+ if (!(inputFormatClass.equals(RCFileInputFormat.class) || inputFormatClass
+ .equals(OrcInputFormat.class))) {
throw new SemanticException(ErrorMsg.ANALYZE_TABLE_PARTIALSCAN_NON_RCFILE.getMsg());
}
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsNoJobWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsNoJobWork.java?rev=1578590&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsNoJobWork.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsNoJobWork.java Mon Mar 17 21:19:57 2014
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
+
+/**
+ * Client-side stats aggregator task.
+ */
+@Explain(displayName = "Stats-Aggr Operator")
+public class StatsNoJobWork implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private tableSpec tableSpecs;
+ private boolean statsReliable;
+
+ public StatsNoJobWork() {
+ }
+
+ public StatsNoJobWork(tableSpec tableSpecs) {
+ this.tableSpecs = tableSpecs;
+ }
+
+ public StatsNoJobWork(boolean statsReliable) {
+ this.statsReliable = statsReliable;
+ }
+
+ public tableSpec getTableSpecs() {
+ return tableSpecs;
+ }
+
+ public boolean isStatsReliable() {
+ return statsReliable;
+ }
+
+ public void setStatsReliable(boolean statsReliable) {
+ this.statsReliable = statsReliable;
+ }
+}
Added: hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q?rev=1578590&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/orc_analyze.q Mon Mar 17 21:19:57 2014
@@ -0,0 +1,155 @@
+CREATE TABLE orc_create_people_staging (
+ id int,
+ first_name string,
+ last_name string,
+ address string,
+ state string);
+
+LOAD DATA LOCAL INPATH '../../data/files/orc_create_people.txt' OVERWRITE INTO TABLE orc_create_people_staging;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+set hive.stats.autogather=false;
+-- non-partitioned table
+-- partial scan gather
+CREATE TABLE orc_create_people (
+ id int,
+ first_name string,
+ last_name string,
+ address string,
+ state string)
+STORED AS orc;
+
+INSERT OVERWRITE TABLE orc_create_people SELECT * FROM orc_create_people_staging;
+
+set hive.stats.autogather = true;
+analyze table orc_create_people compute statistics partialscan;
+
+desc formatted orc_create_people;
+
+drop table orc_create_people;
+
+-- auto stats gather
+CREATE TABLE orc_create_people (
+ id int,
+ first_name string,
+ last_name string,
+ address string,
+ state string)
+STORED AS orc;
+
+INSERT OVERWRITE TABLE orc_create_people SELECT * FROM orc_create_people_staging;
+
+desc formatted orc_create_people;
+
+drop table orc_create_people;
+
+set hive.stats.autogather=false;
+-- partitioned table
+-- partial scan gather
+CREATE TABLE orc_create_people (
+ id int,
+ first_name string,
+ last_name string,
+ address string)
+PARTITIONED BY (state string)
+STORED AS orc;
+
+INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
+ SELECT * FROM orc_create_people_staging;
+
+set hive.stats.autogather = true;
+analyze table orc_create_people partition(state) compute statistics partialscan;
+
+desc formatted orc_create_people partition(state="Ca");
+desc formatted orc_create_people partition(state="Or");
+
+drop table orc_create_people;
+
+-- auto stats gather
+CREATE TABLE orc_create_people (
+ id int,
+ first_name string,
+ last_name string,
+ address string)
+PARTITIONED BY (state string)
+STORED AS orc;
+
+INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
+ SELECT * FROM orc_create_people_staging;
+
+desc formatted orc_create_people partition(state="Ca");
+desc formatted orc_create_people partition(state="Or");
+
+drop table orc_create_people;
+
+set hive.stats.autogather=false;
+-- partitioned and bucketed table
+-- partial scan gather
+CREATE TABLE orc_create_people (
+ id int,
+ first_name string,
+ last_name string,
+ address string)
+PARTITIONED BY (state string)
+clustered by (first_name)
+sorted by (last_name)
+into 4 buckets
+STORED AS orc;
+
+INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
+ SELECT * FROM orc_create_people_staging;
+
+set hive.stats.autogather = true;
+analyze table orc_create_people partition(state) compute statistics partialscan;
+
+desc formatted orc_create_people partition(state="Ca");
+desc formatted orc_create_people partition(state="Or");
+
+drop table orc_create_people;
+
+-- auto stats gather
+CREATE TABLE orc_create_people (
+ id int,
+ first_name string,
+ last_name string,
+ address string)
+PARTITIONED BY (state string)
+clustered by (first_name)
+sorted by (last_name)
+into 4 buckets
+STORED AS orc;
+
+INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
+ SELECT * FROM orc_create_people_staging;
+
+desc formatted orc_create_people partition(state="Ca");
+desc formatted orc_create_people partition(state="Or");
+
+drop table orc_create_people;
+
+set hive.stats.autogather=false;
+-- create table with partitions containing text and ORC files.
+-- ORC files implements StatsProvidingRecordReader but text files does not.
+-- So the partition containing text file should not have statistics.
+CREATE TABLE orc_create_people (
+ id int,
+ first_name string,
+ last_name string,
+ address string)
+PARTITIONED BY (state string)
+STORED AS orc;
+
+INSERT OVERWRITE TABLE orc_create_people PARTITION (state)
+ SELECT * FROM orc_create_people_staging;
+
+ALTER TABLE orc_create_people ADD PARTITION(state="OH");
+LOAD DATA LOCAL INPATH '../../data/files/kv1.txt' OVERWRITE INTO TABLE orc_create_people PARTITION(state="OH");
+
+set hive.stats.autogather = true;
+analyze table orc_create_people partition(state) compute statistics noscan;
+
+desc formatted orc_create_people partition(state="Ca");
+desc formatted orc_create_people partition(state="OH");
+
+drop table orc_create_people;