You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/11/07 22:33:57 UTC
[20/22] hive git commit: HIVE-16827 : Merge stats task and column
stats task into a single task (Zoltan Haindrich via Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index dbf4b8d..d86162b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -33,7 +33,6 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.BlobStorageUtils;
@@ -71,7 +70,6 @@ import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -85,16 +83,17 @@ import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.BasicStatsWork;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc;
-import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -122,6 +121,7 @@ import org.apache.hadoop.mapred.InputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Interner;
@@ -496,6 +496,10 @@ public final class GenMapRedUtils {
Path tblDir = null;
plan.setNameToSplitSample(parseCtx.getNameToSplitSample());
+ // we also collect table stats while collecting column stats.
+ if (parseCtx.getAnalyzeRewrite() != null) {
+ plan.setGatheringStats(true);
+ }
if (partsList == null) {
try {
@@ -1480,17 +1484,34 @@ public final class GenMapRedUtils {
Task<? extends Serializable> currTask, HiveConf hconf) {
MoveWork mvWork = mvTask.getWork();
- StatsWork statsWork = null;
+ BasicStatsWork statsWork = null;
+ Table table = null;
+ boolean truncate = false;
if (mvWork.getLoadTableWork() != null) {
- statsWork = new StatsWork(mvWork.getLoadTableWork());
+ statsWork = new BasicStatsWork(mvWork.getLoadTableWork());
+ String tableName = mvWork.getLoadTableWork().getTable().getTableName();
+ truncate = mvWork.getLoadTableWork().getReplace();
+ try {
+ table = Hive.get().getTable(SessionState.get().getCurrentDatabase(), tableName);
+ } catch (HiveException e) {
+ throw new RuntimeException("unexpected; table should be present already..: " + tableName, e);
+ }
} else if (mvWork.getLoadFileWork() != null) {
- statsWork = new StatsWork(mvWork.getLoadFileWork());
+ statsWork = new BasicStatsWork(mvWork.getLoadFileWork());
+
+ truncate = true;
+ if (mvWork.getLoadFileWork().getCtasCreateTableDesc() == null) {
+ throw new RuntimeException("unexpected; this should be a CTAS - however no desc present");
+ }
+ try {
+ table = mvWork.getLoadFileWork().getCtasCreateTableDesc().toTable(hconf);
+ } catch (HiveException e) {
+ LOG.debug("can't pre-create table", e);
+ table = null;
+ }
}
assert statsWork != null : "Error when generating StatsTask";
- statsWork.setSourceTask(currTask);
- statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
- statsWork.setStatsTmpDir(nd.getConf().getStatsTmpDir());
if (currTask.getWork() instanceof MapredWork) {
MapredWork mrWork = (MapredWork) currTask.getWork();
mrWork.getMapWork().setGatheringStats(true);
@@ -1509,10 +1530,13 @@ public final class GenMapRedUtils {
}
}
- // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
- // in FileSinkDesc is used for stats publishing. They should be consistent.
- statsWork.setAggKey(nd.getConf().getStatsAggPrefix());
- Task<? extends Serializable> statsTask = TaskFactory.get(statsWork, hconf);
+ StatsWork columnStatsWork = new StatsWork(table, statsWork, hconf);
+ columnStatsWork.collectStatsFromAggregator(nd.getConf());
+
+ columnStatsWork.truncateExisting(truncate);
+
+ columnStatsWork.setSourceTask(currTask);
+ Task<? extends Serializable> statsTask = TaskFactory.get(columnStatsWork, hconf);
// subscribe feeds from the MoveTask so that MoveTask can forward the list
// of dynamic partition list to the StatsTask
@@ -1579,7 +1603,7 @@ public final class GenMapRedUtils {
*/
public static MapWork createMergeTask(FileSinkDesc fsInputDesc, Path finalName,
boolean hasDynamicPartitions, CompilationOpContext ctx) throws SemanticException {
-
+
Path inputDir = fsInputDesc.getMergeInputDirName();
TableDesc tblDesc = fsInputDesc.getTableInfo();
@@ -1860,7 +1884,9 @@ public final class GenMapRedUtils {
public static boolean isMergeRequired(List<Task<MoveWork>> mvTasks, HiveConf hconf,
FileSinkOperator fsOp, Task<? extends Serializable> currTask, boolean isInsertTable) {
// Has the user enabled merging of files for map-only jobs or for all jobs
- if (mvTasks == null || mvTasks.isEmpty()) return false;
+ if (mvTasks == null || mvTasks.isEmpty()) {
+ return false;
+ }
// no need of merging if the move is to a local file system
// We are looking based on the original FSOP, so use the original path as is.
@@ -1878,7 +1904,9 @@ public final class GenMapRedUtils {
}
}
- if (mvTask == null || mvTask.isLocal() || !fsOp.getConf().canBeMerged()) return false;
+ if (mvTask == null || mvTask.isLocal() || !fsOp.getConf().canBeMerged()) {
+ return false;
+ }
if (currTask.getWork() instanceof TezWork) {
// tez blurs the boundary between map and reduce, thus it has it's own config
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
index 91c6c00..624eb6c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hive.ql.optimizer;
+import java.util.List;
+import java.util.Set;
+
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
@@ -30,8 +33,9 @@ import org.apache.hadoop.hive.ql.parse.GenTezWork;
import org.apache.hadoop.hive.ql.parse.spark.GenSparkWork;
import org.apache.hadoop.hive.ql.plan.ArchiveWork;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.BasicStatsNoJobWork;
+import org.apache.hadoop.hive.ql.plan.BasicStatsWork;
import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
import org.apache.hadoop.hive.ql.plan.CopyWork;
import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -45,15 +49,11 @@ import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.Set;
-
/**
* Finds Acid FileSinkDesc objects which can be created in the physical (disconnected) plan, e.g.
* {@link org.apache.hadoop.hive.ql.parse.GenTezUtils#removeUnionOperators(GenTezProcContext, BaseWork, int)}
@@ -92,7 +92,7 @@ public class QueryPlanPostProcessor {
}
else if(work instanceof MapredLocalWork) {
//I don't think this can have any FileSinkOperatorS - more future proofing
- Set<FileSinkOperator> fileSinkOperatorSet = OperatorUtils.findOperators(((MapredLocalWork)work).getAliasToWork().values(), FileSinkOperator.class);
+ Set<FileSinkOperator> fileSinkOperatorSet = OperatorUtils.findOperators(((MapredLocalWork) work).getAliasToWork().values(), FileSinkOperator.class);
for(FileSinkOperator fsop : fileSinkOperatorSet) {
collectFileSinkDescs(fsop, acidSinks);
}
@@ -100,42 +100,6 @@ public class QueryPlanPostProcessor {
else if(work instanceof ExplainWork) {
new QueryPlanPostProcessor(((ExplainWork)work).getRootTasks(), acidSinks, executionId);
}
- /*
- ekoifman:~ ekoifman$ cd dev/hiverwgit/ql/src/java/org/apache/
-ekoifman:apache ekoifman$ find . -name *Work.java
-./hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
-./hadoop/hive/ql/exec/repl/ReplDumpWork.java
-./hadoop/hive/ql/exec/repl/ReplStateLogWork.java
-./hadoop/hive/ql/index/IndexMetadataChangeWork.java
-./hadoop/hive/ql/io/merge/MergeFileWork.java - extends MapWork
-./hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java - extends MapWork
-./hadoop/hive/ql/parse/GenTezWork.java
-./hadoop/hive/ql/parse/spark/GenSparkWork.java
-./hadoop/hive/ql/plan/ArchiveWork.java
-./hadoop/hive/ql/plan/BaseWork.java
-./hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
-./hadoop/hive/ql/plan/ColumnStatsWork.java
-./hadoop/hive/ql/plan/ConditionalWork.java
-./hadoop/hive/ql/plan/CopyWork.java
-./hadoop/hive/ql/plan/DDLWork.java
-./hadoop/hive/ql/plan/DependencyCollectionWork.java
-./hadoop/hive/ql/plan/ExplainSQRewriteWork.java
-./hadoop/hive/ql/plan/ExplainWork.java
-./hadoop/hive/ql/plan/FetchWork.java
-./hadoop/hive/ql/plan/FunctionWork.java
-./hadoop/hive/ql/plan/MapredLocalWork.java
-./hadoop/hive/ql/plan/MapredWork.java
-./hadoop/hive/ql/plan/MapWork.java - extends BaseWork
-./hadoop/hive/ql/plan/MergeJoinWork.java - extends BaseWork
-./hadoop/hive/ql/plan/MoveWork.java
-./hadoop/hive/ql/plan/ReduceWork.java
-./hadoop/hive/ql/plan/ReplCopyWork.java - extends CopyWork
-./hadoop/hive/ql/plan/SparkWork.java
-./hadoop/hive/ql/plan/StatsNoJobWork.java
-./hadoop/hive/ql/plan/StatsWork.java
-./hadoop/hive/ql/plan/TezWork.java
-./hadoop/hive/ql/plan/UnionWork.java - extends BaseWork
- */
else if(work instanceof ReplLoadWork ||
work instanceof ReplStateLogWork ||
work instanceof IndexMetadataChangeWork ||
@@ -143,7 +107,7 @@ ekoifman:apache ekoifman$ find . -name *Work.java
work instanceof GenSparkWork ||
work instanceof ArchiveWork ||
work instanceof ColumnStatsUpdateWork ||
- work instanceof ColumnStatsWork ||
+ work instanceof BasicStatsWork ||
work instanceof ConditionalWork ||
work instanceof CopyWork ||
work instanceof DDLWork ||
@@ -152,7 +116,7 @@ ekoifman:apache ekoifman$ find . -name *Work.java
work instanceof FetchWork ||
work instanceof FunctionWork ||
work instanceof MoveWork ||
- work instanceof StatsNoJobWork ||
+ work instanceof BasicStatsNoJobWork ||
work instanceof StatsWork) {
LOG.debug("Found " + work.getClass().getName() + " - no FileSinkOperation can be present. executionId=" + executionId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java
index 3a20cfe..bf7a644 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MemoryDecider.java
@@ -22,11 +22,9 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
-import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.Stack;
@@ -35,12 +33,9 @@ import java.util.TreeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
@@ -58,8 +53,6 @@ import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
/**
@@ -174,6 +167,7 @@ public class MemoryDecider implements PhysicalPlanResolver {
}
Comparator<MapJoinOperator> comp = new Comparator<MapJoinOperator>() {
+ @Override
public int compare(MapJoinOperator mj1, MapJoinOperator mj2) {
if (mj1 == null || mj2 == null) {
throw new NullPointerException();
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
index dc433fe..6a0ca5d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
@@ -26,8 +26,8 @@ import java.util.Map;
import java.util.Set;
import java.util.Stack;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
index 2f9783e..e1df60b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
@@ -86,7 +86,9 @@ public class SkewJoinResolver implements PhysicalPlanResolver {
ParseContext pc = physicalContext.getParseContext();
if (pc.getLoadTableWork() != null) {
for (LoadTableDesc ltd : pc.getLoadTableWork()) {
- if (!ltd.isMmTable()) continue;
+ if (!ltd.isMmTable()) {
+ continue;
+ }
// See the path in FSOP that calls fs.exists on finalPath.
LOG.debug("Not using skew join because the destination table "
+ ltd.getTable().getTableName() + " is an insert_only table");
@@ -95,9 +97,10 @@ public class SkewJoinResolver implements PhysicalPlanResolver {
}
if (pc.getLoadFileWork() != null) {
for (LoadFileDesc lfd : pc.getLoadFileWork()) {
- if (!lfd.isMmCtas()) continue;
- LOG.debug("Not using skew join because the destination table "
- + lfd.getDestinationCreateTable() + " is an insert_only table");
+ if (!lfd.isMmCtas()) {
+ continue;
+ }
+ LOG.debug("Not using skew join because the destination table is an insert_only table");
return null;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index 0f7ef8b..e9ae590 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -81,7 +81,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
@@ -369,10 +368,10 @@ public abstract class BaseSemanticAnalyzer {
String dbName = dbTablePair.getKey();
String tableName = dbTablePair.getValue();
if (dbName != null){
- return StatsUtils.getFullyQualifiedTableName(dbName, tableName);
+ return dbName + "." + tableName;
}
if (currentDatabase != null) {
- return StatsUtils.getFullyQualifiedTableName(currentDatabase, tableName);
+ return currentDatabase + "." + tableName;
}
return tableName;
} else if (tokenType == HiveParser.StringLiteral) {
@@ -1120,19 +1119,22 @@ public abstract class BaseSemanticAnalyzer {
public TableSpec(Table table) {
tableHandle = table;
- tableName = table.getFullyQualifiedName();
+ tableName = table.getDbName() + "." + table.getTableName();
specType = SpecType.TABLE_ONLY;
}
public TableSpec(Hive db, String tableName, Map<String, String> partSpec)
throws HiveException {
Table table = db.getTable(tableName);
- final Partition partition = partSpec == null ? null : db.getPartition(table, partSpec, false);
tableHandle = table;
- this.tableName = table.getFullyQualifiedName();
- if (partition == null) {
+ this.tableName = table.getDbName() + "." + table.getTableName();
+ if (partSpec == null) {
specType = SpecType.TABLE_ONLY;
} else {
+ Partition partition = db.getPartition(table, partSpec, false);
+ if (partition == null) {
+ throw new SemanticException("partition is unknown: " + table + "/" + partSpec);
+ }
partHandle = partition;
partitions = Collections.singletonList(partHandle);
specType = SpecType.STATIC_PARTITION;
@@ -1733,7 +1735,9 @@ public abstract class BaseSemanticAnalyzer {
@VisibleForTesting
static void normalizeColSpec(Map<String, String> partSpec, String colName,
String colType, String originalColSpec, Object colValue) throws SemanticException {
- if (colValue == null) return; // nothing to do with nulls
+ if (colValue == null) {
+ return; // nothing to do with nulls
+ }
String normalizedColSpec = originalColSpec;
if (colType.equals(serdeConstants.DATE_TYPE_NAME)) {
normalizedColSpec = normalizeDateCol(colValue, originalColSpec);
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 3415a23..29b904e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -93,8 +93,10 @@ import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
import org.apache.hadoop.hive.ql.plan.AlterTableExchangePartition;
import org.apache.hadoop.hive.ql.plan.AlterTableSimpleDesc;
import org.apache.hadoop.hive.ql.plan.AlterWMTriggerDesc;
+import org.apache.hadoop.hive.ql.plan.BasicStatsWork;
import org.apache.hadoop.hive.ql.plan.CacheMetadataDesc;
import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.CreateIndexDesc;
import org.apache.hadoop.hive.ql.plan.CreateResourcePlanDesc;
@@ -140,7 +142,6 @@ import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc;
import org.apache.hadoop.hive.ql.plan.ShowTablesDesc;
import org.apache.hadoop.hive.ql.plan.ShowTblPropertiesDesc;
import org.apache.hadoop.hive.ql.plan.ShowTxnsDesc;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.SwitchDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
@@ -1313,18 +1314,19 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
// Recalculate the HDFS stats if auto gather stats is set
if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
- StatsWork statDesc;
+ BasicStatsWork basicStatsWork;
if (oldTblPartLoc.equals(newTblPartLoc)) {
// If we're merging to the same location, we can avoid some metastore calls
TableSpec tablepart = new TableSpec(this.db, conf, root);
- statDesc = new StatsWork(tablepart);
+ basicStatsWork = new BasicStatsWork(tablepart);
} else {
- statDesc = new StatsWork(ltd);
+ basicStatsWork = new BasicStatsWork(ltd);
}
- statDesc.setNoStatsAggregator(true);
- statDesc.setClearAggregatorStats(true);
- statDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
- Task<? extends Serializable> statTask = TaskFactory.get(statDesc, conf);
+ basicStatsWork.setNoStatsAggregator(true);
+ basicStatsWork.setClearAggregatorStats(true);
+ StatsWork columnStatsWork = new StatsWork(table, basicStatsWork, conf);
+
+ Task<? extends Serializable> statTask = TaskFactory.get(columnStatsWork, conf);
moveTsk.addDependentTask(statTask);
}
} catch (HiveException e) {
@@ -1657,7 +1659,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
alterTblDesc.setEnvironmentContext(environmentContext);
alterTblDesc.setOldName(tableName);
- boolean isPotentialMmSwitch = mapProp.containsKey(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL)
+ boolean isPotentialMmSwitch = AcidUtils.isTablePropertyTransactional(mapProp)
|| mapProp.containsKey(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
addInputsOutputsAlterTable(tableName, partSpec, alterTblDesc, isPotentialMmSwitch);
@@ -1972,18 +1974,19 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
mergeTask.addDependentTask(moveTsk);
if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
- StatsWork statDesc;
+ BasicStatsWork basicStatsWork;
if (oldTblPartLoc.equals(newTblPartLoc)) {
// If we're merging to the same location, we can avoid some metastore calls
TableSpec tableSpec = new TableSpec(db, tableName, partSpec);
- statDesc = new StatsWork(tableSpec);
+ basicStatsWork = new BasicStatsWork(tableSpec);
} else {
- statDesc = new StatsWork(ltd);
+ basicStatsWork = new BasicStatsWork(ltd);
}
- statDesc.setNoStatsAggregator(true);
- statDesc.setClearAggregatorStats(true);
- statDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
- Task<? extends Serializable> statTask = TaskFactory.get(statDesc, conf);
+ basicStatsWork.setNoStatsAggregator(true);
+ basicStatsWork.setClearAggregatorStats(true);
+ StatsWork columnStatsWork = new StatsWork(tblObj, basicStatsWork, conf);
+
+ Task<? extends Serializable> statTask = TaskFactory.get(columnStatsWork, conf);
moveTsk.addDependentTask(statTask);
}
@@ -2085,7 +2088,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
switch (child.getToken().getType()) {
case HiveParser.TOK_UNIQUE:
BaseSemanticAnalyzer.processUniqueConstraints(qualifiedTabName[0], qualifiedTabName[1],
- child, uniqueConstraints);
+ child, uniqueConstraints);
break;
case HiveParser.TOK_PRIMARY_KEY:
BaseSemanticAnalyzer.processPrimaryKeys(qualifiedTabName[0], qualifiedTabName[1],
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
index 7a0d4a7..065c7e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.antlr.runtime.TokenRewriteStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,17 +39,12 @@ import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.VectorizationDetailLevel;
import org.apache.hadoop.hive.ql.plan.ExplainWork;
-import org.apache.hadoop.hive.ql.processors.CommandProcessor;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.fs.FSStatsAggregator;
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 01cb2b3..b6f1139 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -205,10 +205,6 @@ public class GenTezUtils {
// All the setup is done in GenMapRedUtils
GenMapRedUtils.setMapWork(mapWork, context.parseContext,
context.inputs, partitions, root, alias, context.conf, false);
- // we also collect table stats while collecting column stats.
- if (context.parseContext.getAnalyzeRewrite() != null) {
- mapWork.setGatheringStats(true);
- }
}
// removes any union operator and clones the plan
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 1318c18..cd75130 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -238,7 +238,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
tblDesc.setReplicationSpec(replicationSpec);
- tblDesc.getTblProps().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+ StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE);
}
if (isExternalSet){
@@ -266,7 +266,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
// TODO: this should ideally not create AddPartitionDesc per partition
AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
- partsDesc.getPartition(0).getPartParams().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+ StatsSetupConst.setBasicStatsState(partsDesc.getPartition(0).getPartParams(), StatsSetupConst.FALSE);
}
partitionDescs.add(partsDesc);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index 5f2a34e..238fbd6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.hive.conf.HiveConf.StrictChecks;
-
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
@@ -36,7 +35,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -50,11 +48,12 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.hive.ql.plan.BasicStatsWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.InputFormat;
@@ -225,8 +224,10 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
List<String> bucketCols = ts.tableHandle.getBucketCols();
if (bucketCols != null && !bucketCols.isEmpty()) {
String error = StrictChecks.checkBucketing(conf);
- if (error != null) throw new SemanticException("Please load into an intermediate table"
- + " and use 'insert... select' to allow Hive to enforce bucketing. " + error);
+ if (error != null) {
+ throw new SemanticException("Please load into an intermediate table"
+ + " and use 'insert... select' to allow Hive to enforce bucketing. " + error);
+ }
}
if(AcidUtils.isAcidTable(ts.tableHandle) && !AcidUtils.isInsertOnlyTable(ts.tableHandle.getParameters())) {
@@ -313,11 +314,11 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
// Update the stats which do not require a complete scan.
Task<? extends Serializable> statTask = null;
if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
- StatsWork statDesc = new StatsWork(loadTableWork);
- statDesc.setNoStatsAggregator(true);
- statDesc.setClearAggregatorStats(true);
- statDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
- statTask = TaskFactory.get(statDesc, conf);
+ BasicStatsWork basicStatsWork = new BasicStatsWork(loadTableWork);
+ basicStatsWork.setNoStatsAggregator(true);
+ basicStatsWork.setClearAggregatorStats(true);
+ StatsWork columnStatsWork = new StatsWork(ts.tableHandle, basicStatsWork, conf);
+ statTask = TaskFactory.get(columnStatsWork, conf);
}
// HIVE-3334 has been filed for load file with index auto update
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
index 9309fbd..1c2ad7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.parse;
-import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.Stack;
@@ -26,30 +25,25 @@ import java.util.Stack;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.BasicStatsWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.mapred.InputFormat;
/**
* ProcessAnalyzeTable sets up work for the several variants of analyze table
- * (normal, no scan, partial scan.) The plan at this point will be a single
+ * (normal, no scan.) The plan at this point will be a single
* table scan operator.
*/
public class ProcessAnalyzeTable implements NodeProcessor {
@@ -76,8 +70,8 @@ public class ProcessAnalyzeTable implements NodeProcessor {
TableScanOperator tableScan = (TableScanOperator) nd;
ParseContext parseContext = context.parseContext;
- Class<? extends InputFormat> inputFormat = tableScan.getConf().getTableMetadata()
- .getInputFormatClass();
+ Table table = tableScan.getConf().getTableMetadata();
+ Class<? extends InputFormat> inputFormat = table.getInputFormatClass();
if (parseContext.getQueryProperties().isAnalyzeCommand()) {
@@ -99,20 +93,17 @@ public class ProcessAnalyzeTable implements NodeProcessor {
// ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
// There will not be any Tez job above this task
- StatsNoJobWork snjWork = new StatsNoJobWork(tableScan.getConf().getTableMetadata()
- .getTableSpec());
- snjWork.setStatsReliable(parseContext.getConf().getBoolVar(
- HiveConf.ConfVars.HIVE_STATS_RELIABLE));
+ StatsWork statWork = new StatsWork(table, parseContext.getConf());
+ statWork.setFooterScan();
+
// If partition is specified, get pruned partition list
Set<Partition> confirmedParts = GenMapRedUtils.getConfirmedPartitionsForScan(tableScan);
if (confirmedParts.size() > 0) {
- Table source = tableScan.getConf().getTableMetadata();
List<String> partCols = GenMapRedUtils.getPartitionColumns(tableScan);
- PrunedPartitionList partList = new PrunedPartitionList(source, confirmedParts, partCols,
- false);
- snjWork.setPrunedPartitionList(partList);
+ PrunedPartitionList partList = new PrunedPartitionList(table, confirmedParts, partCols, false);
+ statWork.addInputPartitions(partList.getPartitions());
}
- Task<StatsNoJobWork> snjTask = TaskFactory.get(snjWork, parseContext.getConf());
+ Task<StatsWork> snjTask = TaskFactory.get(statWork, parseContext.getConf());
snjTask.setParentTasks(null);
context.rootTasks.remove(context.currentTask);
context.rootTasks.add(snjTask);
@@ -123,20 +114,19 @@ public class ProcessAnalyzeTable implements NodeProcessor {
// The plan consists of a simple TezTask followed by a StatsTask.
// The Tez task is just a simple TableScanOperator
- StatsWork statsWork = new StatsWork(tableScan.getConf().getTableMetadata().getTableSpec());
- statsWork.setAggKey(tableScan.getConf().getStatsAggPrefix());
- statsWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir());
- statsWork.setSourceTask(context.currentTask);
- statsWork.setStatsReliable(parseContext.getConf().getBoolVar(
- HiveConf.ConfVars.HIVE_STATS_RELIABLE));
- Task<StatsWork> statsTask = TaskFactory.get(statsWork, parseContext.getConf());
+ BasicStatsWork basicStatsWork = new BasicStatsWork(table.getTableSpec());
+ basicStatsWork.setNoScanAnalyzeCommand(parseContext.getQueryProperties().isNoScanAnalyzeCommand());
+ StatsWork columnStatsWork = new StatsWork(table, basicStatsWork, parseContext.getConf());
+ columnStatsWork.collectStatsFromAggregator(tableScan.getConf());
+
+ columnStatsWork.setSourceTask(context.currentTask);
+ Task<StatsWork> statsTask = TaskFactory.get(columnStatsWork, parseContext.getConf());
context.currentTask.addDependentTask(statsTask);
// ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
// The plan consists of a StatsTask only.
if (parseContext.getQueryProperties().isNoScanAnalyzeCommand()) {
statsTask.setParentTasks(null);
- statsWork.setNoScanAnalyzeCommand(true);
context.rootTasks.remove(context.currentTask);
context.rootTasks.add(statsTask);
}
@@ -147,9 +137,8 @@ public class ProcessAnalyzeTable implements NodeProcessor {
Set<Partition> confirmedPartns = GenMapRedUtils.getConfirmedPartitionsForScan(tableScan);
PrunedPartitionList partitions = null;
if (confirmedPartns.size() > 0) {
- Table source = tableScan.getConf().getTableMetadata();
List<String> partCols = GenMapRedUtils.getPartitionColumns(tableScan);
- partitions = new PrunedPartitionList(source, confirmedPartns, partCols, false);
+ partitions = new PrunedPartitionList(table, confirmedPartns, partCols, false);
}
MapWork w = utils.createMapWork(context, tableScan, tezWork, partitions);
@@ -157,55 +146,8 @@ public class ProcessAnalyzeTable implements NodeProcessor {
return true;
}
- } else if (parseContext.getAnalyzeRewrite() != null) {
- // we need to collect table stats while collecting column stats.
- try {
- context.currentTask.addDependentTask(genTableStats(context, tableScan));
- } catch (HiveException e) {
- throw new SemanticException(e);
- }
}
return null;
}
-
- private Task<?> genTableStats(GenTezProcContext context, TableScanOperator tableScan)
- throws HiveException {
- Class<? extends InputFormat> inputFormat = tableScan.getConf().getTableMetadata()
- .getInputFormatClass();
- ParseContext parseContext = context.parseContext;
- Table table = tableScan.getConf().getTableMetadata();
- List<Partition> partitions = new ArrayList<>();
- if (table.isPartitioned()) {
- partitions.addAll(parseContext.getPrunedPartitions(tableScan).getPartitions());
- for (Partition partn : partitions) {
- LOG.debug("XXX: adding part: " + partn);
- context.outputs.add(new WriteEntity(partn, WriteEntity.WriteType.DDL_NO_LOCK));
- }
- }
- TableSpec tableSpec = new TableSpec(table, partitions);
- tableScan.getConf().getTableMetadata().setTableSpec(tableSpec);
-
- if (inputFormat.equals(OrcInputFormat.class)) {
- // For ORC, there is no Tez Job for table stats.
- StatsNoJobWork snjWork = new StatsNoJobWork(tableScan.getConf().getTableMetadata()
- .getTableSpec());
- snjWork.setStatsReliable(parseContext.getConf().getBoolVar(
- HiveConf.ConfVars.HIVE_STATS_RELIABLE));
- // If partition is specified, get pruned partition list
- if (partitions.size() > 0) {
- snjWork.setPrunedPartitionList(parseContext.getPrunedPartitions(tableScan));
- }
- return TaskFactory.get(snjWork, parseContext.getConf());
- } else {
-
- StatsWork statsWork = new StatsWork(tableScan.getConf().getTableMetadata().getTableSpec());
- statsWork.setAggKey(tableScan.getConf().getStatsAggPrefix());
- statsWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir());
- statsWork.setSourceTask(context.currentTask);
- statsWork.setStatsReliable(parseContext.getConf().getBoolVar(
- HiveConf.ConfVars.HIVE_STATS_RELIABLE));
- return TaskFactory.get(statsWork, parseContext.getConf());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 7a7460e..68240f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7150,7 +7150,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// the following code is used to collect column stats when
// hive.stats.autogather=true
// and it is an insert overwrite or insert into table
- if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
+ if (dest_tab != null
+ && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
&& conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
&& ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
@@ -10655,10 +10656,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
throws SemanticException {
// if it is not analyze command and not column stats, then do not gatherstats
- // if it is column stats, but it is not tez, do not gatherstats
- if ((!qbp.isAnalyzeCommand() && qbp.getAnalyzeRewrite() == null)
- || (qbp.getAnalyzeRewrite() != null && !HiveConf.getVar(conf,
- HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"))) {
+ if (!qbp.isAnalyzeCommand() && qbp.getAnalyzeRewrite() == null) {
tsDesc.setGatherStats(false);
} else {
if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index a63f709..7b29370 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -22,8 +22,10 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -39,9 +41,9 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.exec.ColumnStatsTask;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -49,12 +51,16 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+import org.apache.hadoop.hive.ql.plan.BasicStatsWork;
import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
import org.apache.hadoop.hive.ql.plan.CreateViewDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -64,6 +70,7 @@ import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -74,16 +81,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
+import org.apache.hadoop.mapred.InputFormat;
/**
* TaskCompiler is a the base class for classes that compile
@@ -273,9 +271,65 @@ public abstract class TaskCompiler {
/*
* If the query was the result of analyze table column compute statistics rewrite, create
* a column stats task instead of a fetch task to persist stats to the metastore.
+ * As per HIVE-15903, we will also collect table stats when user computes column stats.
+ * That means, if isCStats || !pCtx.getColumnStatsAutoGatherContexts().isEmpty()
+ * We need to collect table stats
+ * if isCStats, we need to include a basic stats task
+ * else it is ColumnStatsAutoGather, which should have a move task with a stats task already.
*/
if (isCStats || !pCtx.getColumnStatsAutoGatherContexts().isEmpty()) {
- createColumnStatsTasks(pCtx, rootTasks, loadFileWork, isCStats, outerQueryLimit);
+ // map from tablename to task (ColumnStatsTask which includes a BasicStatsTask)
+ Map<String, StatsTask> map = new LinkedHashMap<>();
+ if (isCStats) {
+ if (rootTasks == null || rootTasks.size() != 1 || pCtx.getTopOps() == null
+ || pCtx.getTopOps().size() != 1) {
+ throw new SemanticException("Can not find correct root task!");
+ }
+ try {
+ Task<? extends Serializable> root = rootTasks.iterator().next();
+ StatsTask tsk = (StatsTask) genTableStats(pCtx, pCtx.getTopOps().values()
+ .iterator().next(), root, outputs);
+ root.addDependentTask(tsk);
+ map.put(extractTableFullName(tsk), tsk);
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+ genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, map, outerQueryLimit, 0);
+ } else {
+ Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? extends Serializable>>();
+ getLeafTasks(rootTasks, leafTasks);
+ List<Task<? extends Serializable>> nonStatsLeafTasks = new ArrayList<>();
+ for (Task<? extends Serializable> tsk : leafTasks) {
+ // map table name to the correct ColumnStatsTask
+ if (tsk instanceof StatsTask) {
+ map.put(extractTableFullName((StatsTask) tsk), (StatsTask) tsk);
+ } else {
+ nonStatsLeafTasks.add(tsk);
+ }
+ }
+ // add cStatsTask as a dependent of all the nonStatsLeafTasks
+ for (Task<? extends Serializable> tsk : nonStatsLeafTasks) {
+ for (Task<? extends Serializable> cStatsTask : map.values()) {
+ tsk.addDependentTask(cStatsTask);
+ }
+ }
+ for (ColumnStatsAutoGatherContext columnStatsAutoGatherContext : pCtx
+ .getColumnStatsAutoGatherContexts()) {
+ if (!columnStatsAutoGatherContext.isInsertInto()) {
+ genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
+ columnStatsAutoGatherContext.getLoadFileWork(), map, outerQueryLimit, 0);
+ } else {
+ int numBitVector;
+ try {
+ numBitVector = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
+ } catch (Exception e) {
+ throw new SemanticException(e.getMessage());
+ }
+ genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
+ columnStatsAutoGatherContext.getLoadFileWork(), map, outerQueryLimit, numBitVector);
+ }
+ }
+ }
}
decideExecMode(rootTasks, ctx, globalLimitCtx);
@@ -322,6 +376,44 @@ public abstract class TaskCompiler {
}
}
+ private String extractTableFullName(StatsTask tsk) throws SemanticException {
+ return tsk.getWork().getFullTableName();
+ }
+
+ private Task<?> genTableStats(ParseContext parseContext, TableScanOperator tableScan, Task currentTask, final HashSet<WriteEntity> outputs)
+ throws HiveException {
+ Class<? extends InputFormat> inputFormat = tableScan.getConf().getTableMetadata()
+ .getInputFormatClass();
+ Table table = tableScan.getConf().getTableMetadata();
+ List<Partition> partitions = new ArrayList<>();
+ if (table.isPartitioned()) {
+ partitions.addAll(parseContext.getPrunedPartitions(tableScan).getPartitions());
+ for (Partition partn : partitions) {
+ LOG.trace("adding part: " + partn);
+ outputs.add(new WriteEntity(partn, WriteEntity.WriteType.DDL_NO_LOCK));
+ }
+ }
+ TableSpec tableSpec = new TableSpec(table, partitions);
+ tableScan.getConf().getTableMetadata().setTableSpec(tableSpec);
+
+ if (inputFormat.equals(OrcInputFormat.class)) {
+ // For ORC, there is no Tez Job for table stats.
+ StatsWork columnStatsWork = new StatsWork(table, parseContext.getConf());
+ columnStatsWork.setFooterScan();
+ // If partition is specified, get pruned partition list
+ if (partitions.size() > 0) {
+ columnStatsWork.addInputPartitions(parseContext.getPrunedPartitions(tableScan).getPartitions());
+ }
+ return TaskFactory.get(columnStatsWork, parseContext.getConf());
+ } else {
+ BasicStatsWork statsWork = new BasicStatsWork(tableScan.getConf().getTableMetadata().getTableSpec());
+ StatsWork columnStatsWork = new StatsWork(table, statsWork, parseContext.getConf());
+ columnStatsWork.collectStatsFromAggregator(tableScan.getConf());
+ columnStatsWork.setSourceTask(currentTask);
+ return TaskFactory.get(columnStatsWork, parseContext.getConf());
+ }
+ }
+
private void setLoadFileLocation(
final ParseContext pCtx, LoadFileDesc lfd) throws SemanticException {
// CTAS; make the movetask's destination directory the table's destination.
@@ -353,34 +445,6 @@ public abstract class TaskCompiler {
lfd.setTargetDir(location);
}
- private void createColumnStatsTasks(final ParseContext pCtx,
- final List<Task<? extends Serializable>> rootTasks,
- List<LoadFileDesc> loadFileWork, boolean isCStats, int outerQueryLimit)
- throws SemanticException {
- Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? extends Serializable>>();
- getLeafTasks(rootTasks, leafTasks);
- if (isCStats) {
- genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, outerQueryLimit, 0);
- } else {
- for (ColumnStatsAutoGatherContext columnStatsAutoGatherContext : pCtx
- .getColumnStatsAutoGatherContexts()) {
- if (!columnStatsAutoGatherContext.isInsertInto()) {
- genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
- columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, 0);
- } else {
- int numBitVector;
- try {
- numBitVector = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
- } catch (Exception e) {
- throw new SemanticException(e.getMessage());
- }
- genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
- columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, numBitVector);
- }
- }
- }
- }
-
private Path getDefaultCtasLocation(final ParseContext pCtx) throws SemanticException {
try {
String protoName = null;
@@ -419,10 +483,8 @@ public abstract class TaskCompiler {
}
}
- // find all leaf tasks and make the DDLTask as a dependent task of all of
- // them
- HashSet<Task<? extends Serializable>> leaves =
- new LinkedHashSet<>();
+ // find all leaf tasks and make the DDLTask as a dependent task on all of them
+ HashSet<Task<? extends Serializable>> leaves = new LinkedHashSet<>();
getLeafTasks(rootTasks, leaves);
assert (leaves.size() > 0);
for (Task<? extends Serializable> task : leaves) {
@@ -452,10 +514,8 @@ public abstract class TaskCompiler {
*/
@SuppressWarnings("unchecked")
protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite,
- List<LoadFileDesc> loadFileWork, Set<Task<? extends Serializable>> leafTasks,
- int outerQueryLimit, int numBitVector) {
- ColumnStatsTask cStatsTask;
- ColumnStatsWork cStatsWork;
+ List<LoadFileDesc> loadFileWork, Map<String, StatsTask> map,
+ int outerQueryLimit, int numBitVector) throws SemanticException {
FetchWork fetch;
String tableName = analyzeRewrite.getTableName();
List<String> colName = analyzeRewrite.getColName();
@@ -482,11 +542,12 @@ public abstract class TaskCompiler {
fetch = new FetchWork(loadFileWork.get(0).getSourcePath(), resultTab, outerQueryLimit);
ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName,
- colName, colType, isTblLevel, numBitVector);
- cStatsWork = new ColumnStatsWork(fetch, cStatsDesc, SessionState.get().getCurrentDatabase());
- cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf);
- for (Task<? extends Serializable> tsk : leafTasks) {
- tsk.addDependentTask(cStatsTask);
+ colName, colType, isTblLevel, numBitVector, fetch);
+ StatsTask columnStatsTask = map.get(tableName);
+ if (columnStatsTask == null) {
+ throw new SemanticException("Can not find " + tableName + " in genColumnStatsTask");
+ } else {
+ columnStatsTask.getWork().setColStats(cStatsDesc);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
index 6f21cae..36fe8a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
@@ -25,7 +25,6 @@ import java.util.Stack;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -40,10 +39,10 @@ import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.hive.ql.plan.BasicStatsWork;
import org.apache.hadoop.mapred.InputFormat;
import com.google.common.base.Preconditions;
@@ -78,9 +77,9 @@ public class SparkProcessAnalyzeTable implements NodeProcessor {
ParseContext parseContext = context.parseContext;
+ Table table = tableScan.getConf().getTableMetadata();
@SuppressWarnings("rawtypes")
- Class<? extends InputFormat> inputFormat = tableScan.getConf().getTableMetadata()
- .getInputFormatClass();
+ Class<? extends InputFormat> inputFormat = table.getInputFormatClass();
if (parseContext.getQueryProperties().isAnalyzeCommand()) {
Preconditions.checkArgument(tableScan.getChildOperators() == null
@@ -103,19 +102,16 @@ public class SparkProcessAnalyzeTable implements NodeProcessor {
// ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS
// ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
// There will not be any Spark job above this task
- StatsNoJobWork snjWork = new StatsNoJobWork(tableScan.getConf().getTableMetadata().getTableSpec());
- snjWork.setStatsReliable(parseContext.getConf().getBoolVar(
- HiveConf.ConfVars.HIVE_STATS_RELIABLE));
+ StatsWork statWork = new StatsWork(table, parseContext.getConf());
+ statWork.setFooterScan();
// If partition is specified, get pruned partition list
Set<Partition> confirmedParts = GenMapRedUtils.getConfirmedPartitionsForScan(tableScan);
if (confirmedParts.size() > 0) {
- Table source = tableScan.getConf().getTableMetadata();
List<String> partCols = GenMapRedUtils.getPartitionColumns(tableScan);
- PrunedPartitionList partList = new PrunedPartitionList(source, confirmedParts, partCols,
- false);
- snjWork.setPrunedPartitionList(partList);
+ PrunedPartitionList partList = new PrunedPartitionList(table, confirmedParts, partCols, false);
+ statWork.addInputPartitions(partList.getPartitions());
}
- Task<StatsNoJobWork> snjTask = TaskFactory.get(snjWork, parseContext.getConf());
+ Task<StatsWork> snjTask = TaskFactory.get(statWork, parseContext.getConf());
snjTask.setParentTasks(null);
context.rootTasks.remove(context.currentTask);
context.rootTasks.add(snjTask);
@@ -126,19 +122,18 @@ public class SparkProcessAnalyzeTable implements NodeProcessor {
// The plan consists of a simple SparkTask followed by a StatsTask.
// The Spark task is just a simple TableScanOperator
- StatsWork statsWork = new StatsWork(tableScan.getConf().getTableMetadata().getTableSpec());
- statsWork.setAggKey(tableScan.getConf().getStatsAggPrefix());
- statsWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir());
- statsWork.setSourceTask(context.currentTask);
- statsWork.setStatsReliable(parseContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
- Task<StatsWork> statsTask = TaskFactory.get(statsWork, parseContext.getConf());
+ BasicStatsWork basicStatsWork = new BasicStatsWork(table.getTableSpec());
+ basicStatsWork.setNoScanAnalyzeCommand(parseContext.getQueryProperties().isNoScanAnalyzeCommand());
+ StatsWork columnStatsWork = new StatsWork(table, basicStatsWork, parseContext.getConf());
+ columnStatsWork.collectStatsFromAggregator(tableScan.getConf());
+ columnStatsWork.setSourceTask(context.currentTask);
+ Task<StatsWork> statsTask = TaskFactory.get(columnStatsWork, parseContext.getConf());
context.currentTask.addDependentTask(statsTask);
// ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
// The plan consists of a StatsTask only.
if (parseContext.getQueryProperties().isNoScanAnalyzeCommand()) {
statsTask.setParentTasks(null);
- statsWork.setNoScanAnalyzeCommand(true);
context.rootTasks.remove(context.currentTask);
context.rootTasks.add(statsTask);
}
@@ -148,9 +143,8 @@ public class SparkProcessAnalyzeTable implements NodeProcessor {
Set<Partition> confirmedPartns = GenMapRedUtils.getConfirmedPartitionsForScan(tableScan);
PrunedPartitionList partitions = null;
if (confirmedPartns.size() > 0) {
- Table source = tableScan.getConf().getTableMetadata();
List<String> partCols = GenMapRedUtils.getPartitionColumns(tableScan);
- partitions = new PrunedPartitionList(source, confirmedPartns, partCols, false);
+ partitions = new PrunedPartitionList(table, confirmedPartns, partCols, false);
}
MapWork w = utils.createMapWork(context, tableScan, sparkWork, partitions);
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsNoJobWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsNoJobWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsNoJobWork.java
new file mode 100644
index 0000000..d4f6a41
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsNoJobWork.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+
+/**
+ * Client-side stats aggregator task.
+ */
+public class BasicStatsNoJobWork implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private TableSpec tableSpecs;
+ private boolean statsReliable;
+
+ private Set<Partition> pp;
+
+ public BasicStatsNoJobWork(TableSpec tableSpecs) {
+ this.tableSpecs = tableSpecs;
+ }
+
+ public TableSpec getTableSpecs() {
+ return tableSpecs;
+ }
+
+ public void setStatsReliable(boolean s1) {
+ statsReliable = s1;
+ }
+
+ public boolean isStatsReliable() {
+ return statsReliable;
+ }
+
+ public Set<Partition> getPartitions() {
+ return pp;
+ }
+
+ public void setPartitions(Set<Partition> partitions) {
+ pp = partitions;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java
new file mode 100644
index 0000000..0621bd4
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BasicStatsWork.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
+
+
+/**
+ * ConditionalStats.
+ *
+ */
+public class BasicStatsWork implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private TableSpec tableSpecs; // source table spec -- for TableScanOperator
+ private LoadTableDesc loadTableDesc; // same as MoveWork.loadTableDesc -- for FileSinkOperator
+ private LoadFileDesc loadFileDesc; // same as MoveWork.loadFileDesc -- for FileSinkOperator
+ private String aggKey; // aggregation key prefix
+ private boolean statsReliable; // are stats completely reliable
+
+ // If stats aggregator is not present, clear the current aggregator stats.
+ // For eg. if a merge is being performed, stats already collected by aggregator (numrows etc.)
+ // are still valid. However, if a load file is being performed, the old stats collected by
+ // aggregator are not valid. It might be a good idea to clear them instead of leaving wrong
+ // and old stats.
+ // Since HIVE-12661, we maintain the old stats (although may be wrong) for CBO
+ // purpose. We use a flag COLUMN_STATS_ACCURATE to
+ // show the accuracy of the stats.
+
+ private boolean clearAggregatorStats = false;
+
+ private boolean noStatsAggregator = false;
+
+ private boolean isNoScanAnalyzeCommand = false;
+
+ // sourceTask for TS is not changed (currently) but that of FS might be changed
+ // by various optimizers (auto.convert.join, for example)
+ // so this is set by DriverContext in runtime
+ private transient Task sourceTask;
+
+ private boolean isFollowedByColStats = false;
+
+ // used by FS based stats collector
+ private String statsTmpDir;
+
+ public BasicStatsWork() {
+ }
+
+ public BasicStatsWork(TableSpec tableSpecs) {
+ this.tableSpecs = tableSpecs;
+ }
+
+ public BasicStatsWork(LoadTableDesc loadTableDesc) {
+ this.loadTableDesc = loadTableDesc;
+ }
+
+ public BasicStatsWork(LoadFileDesc loadFileDesc) {
+ this.loadFileDesc = loadFileDesc;
+ }
+
+ public TableSpec getTableSpecs() {
+ return tableSpecs;
+ }
+
+ public LoadTableDesc getLoadTableDesc() {
+ return loadTableDesc;
+ }
+
+ public LoadFileDesc getLoadFileDesc() {
+ return loadFileDesc;
+ }
+
+ public void setAggKey(String aggK) {
+ aggKey = aggK;
+ }
+
+ @Explain(displayName = "Stats Aggregation Key Prefix", explainLevels = { Level.EXTENDED })
+ public String getAggKey() {
+ return aggKey;
+ }
+
+ public String getStatsTmpDir() {
+ return statsTmpDir;
+ }
+
+ public void setStatsTmpDir(String statsTmpDir) {
+ this.statsTmpDir = statsTmpDir;
+ }
+
+ public boolean getNoStatsAggregator() {
+ return noStatsAggregator;
+ }
+
+ public void setNoStatsAggregator(boolean noStatsAggregator) {
+ this.noStatsAggregator = noStatsAggregator;
+ }
+
+ public boolean isStatsReliable() {
+ return statsReliable;
+ }
+
+ public void setStatsReliable(boolean statsReliable) {
+ this.statsReliable = statsReliable;
+ }
+
+ public boolean isClearAggregatorStats() {
+ return clearAggregatorStats;
+ }
+
+ public void setClearAggregatorStats(boolean clearAggregatorStats) {
+ this.clearAggregatorStats = clearAggregatorStats;
+ }
+
+ /**
+ * @return the isNoScanAnalyzeCommand
+ */
+ public boolean isNoScanAnalyzeCommand() {
+ return isNoScanAnalyzeCommand;
+ }
+
+ /**
+ * @param isNoScanAnalyzeCommand the isNoScanAnalyzeCommand to set
+ */
+ public void setNoScanAnalyzeCommand(boolean isNoScanAnalyzeCommand) {
+ this.isNoScanAnalyzeCommand = isNoScanAnalyzeCommand;
+ }
+
+ public Task getSourceTask() {
+ return sourceTask;
+ }
+
+ public void setSourceTask(Task sourceTask) {
+ this.sourceTask = sourceTask;
+ }
+
+ public boolean isFollowedByColStats1() {
+ return isFollowedByColStats;
+ }
+
+ public void setFollowedByColStats1(boolean isFollowedByColStats) {
+ this.isFollowedByColStats = isFollowedByColStats;
+ }
+
+ public boolean isExplicitAnalyze() {
+ // ANALYZE TABLE
+ return (getTableSpecs() != null);
+ }
+ public boolean isTargetRewritten() {
+ // ANALYZE TABLE
+ if (isExplicitAnalyze()) {
+ return true;
+ }
+ // INSERT OVERWRITE
+ if (getLoadTableDesc() != null && getLoadTableDesc().getLoadFileType() == LoadFileType.REPLACE_ALL) {
+ return true;
+ }
+ // CREATE TABLE ... AS
+ if (getLoadFileDesc() != null && getLoadFileDesc().getCtasCreateTableDesc() != null) {
+ return true;
+ }
+ return false;
+ }
+
+ public String getTableName() {
+ BasicStatsWork work = this;
+ if (work.getLoadTableDesc() != null) {
+ return work.getLoadTableDesc().getTable().getTableName();
+ } else if (work.getTableSpecs() != null) {
+ return work.getTableSpecs().tableName;
+ } else {
+ return getLoadFileDesc().getCtasCreateTableDesc().getTableName();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsDesc.java
index 97f323f..1ca7ea6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsDesc.java
@@ -28,31 +28,25 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
public class ColumnStatsDesc extends DDLDesc implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
+ private FetchWork fWork;
+
private boolean isTblLevel;
private int numBitVector;
+ private boolean needMerge;
private String tableName;
private List<String> colName;
private List<String> colType;
- public ColumnStatsDesc() {
- }
- public ColumnStatsDesc(String tableName, List<String> colName, List<String> colType,
- boolean isTblLevel) {
- this.tableName = tableName;
- this.colName = colName;
- this.colType = colType;
- this.isTblLevel = isTblLevel;
- this.numBitVector = 0;
- }
-
public ColumnStatsDesc(String tableName, List<String> colName,
- List<String> colType, boolean isTblLevel, int numBitVector) {
+ List<String> colType, boolean isTblLevel, int numBitVector, FetchWork fWork1) {
this.tableName = tableName;
this.colName = colName;
this.colType = colType;
this.isTblLevel = isTblLevel;
this.numBitVector = numBitVector;
+ this.needMerge = this.numBitVector != 0;
+ this.fWork = fWork1;
}
@Explain(displayName = "Table")
@@ -99,4 +93,13 @@ public class ColumnStatsDesc extends DDLDesc implements Serializable, Cloneable
this.numBitVector = numBitVector;
}
+ public boolean isNeedMerge() {
+ return needMerge;
+ }
+
+
+ public FetchWork getFWork() {
+ return fWork;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java
deleted file mode 100644
index 842fd1a..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.plan;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
-import org.apache.hadoop.hive.ql.plan.Explain.Level;
-
-
-/**
- * ColumnStats Work.
- *
- */
-@Explain(displayName = "Column Stats Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class ColumnStatsWork implements Serializable {
- private static final long serialVersionUID = 1L;
- private FetchWork fWork;
- private ColumnStatsDesc colStats;
- private String currentDatabaseName;
- private static final int LIMIT = -1;
-
-
- public ColumnStatsWork() {
- }
-
- public ColumnStatsWork(FetchWork work, ColumnStatsDesc colStats, String currentDatabaseName) {
- this.fWork = work;
- this.setColStats(colStats);
- this.currentDatabaseName = currentDatabaseName;
- }
-
- @Override
- public String toString() {
- String ret;
- ret = fWork.toString();
- return ret;
- }
-
- public FetchWork getfWork() {
- return fWork;
- }
-
- public void setfWork(FetchWork fWork) {
- this.fWork = fWork;
- }
-
- @Explain(displayName = "Column Stats Desc")
- public ColumnStatsDesc getColStats() {
- return colStats;
- }
-
- public void setColStats(ColumnStatsDesc colStats) {
- this.colStats = colStats;
- }
-
- public ListSinkOperator getSink() {
- return fWork.getSink();
- }
-
- public void initializeForFetch(CompilationOpContext ctx) {
- fWork.initializeForFetch(ctx);
- }
-
- public int getLeastNumRows() {
- return fWork.getLeastNumRows();
- }
-
- public static int getLimit() {
- return LIMIT;
- }
-
- public String getCurrentDatabaseName() {
- return currentDatabaseName;
- }
-
- public void setCurrentDatabaseName(String currentDatabaseName) {
- this.currentDatabaseName = currentDatabaseName;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 550e6f8..92c9768 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
*
*/
@Explain(displayName = "File Output Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class FileSinkDesc extends AbstractOperatorDesc {
+public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDesc {
private static final long serialVersionUID = 1L;
public enum DPSortState {
@@ -373,6 +373,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
this.gatherStats = gatherStats;
}
+ @Override
@Explain(displayName = "GatherStats", explainLevels = { Level.EXTENDED })
public boolean isGatherStats() {
return gatherStats;
@@ -389,6 +390,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
* will be aggregated.
* @return key prefix used for stats publishing and aggregation.
*/
+ @Override
@Explain(displayName = "Stats Publishing Key Prefix", explainLevels = { Level.EXTENDED })
public String getStatsAggPrefix() {
// dirName uniquely identifies destination directory of a FileSinkOperator.
@@ -511,7 +513,8 @@ public class FileSinkDesc extends AbstractOperatorDesc {
}
- public String getStatsTmpDir() {
+ @Override
+ public String getTmpStatsDir() {
return statsTmpDir;
}
@@ -577,4 +580,5 @@ public class FileSinkDesc extends AbstractOperatorDesc {
}
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/IStatsGatherDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/IStatsGatherDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/IStatsGatherDesc.java
new file mode 100644
index 0000000..a83c4fb
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/IStatsGatherDesc.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+public interface IStatsGatherDesc {
+ public boolean isGatherStats();
+
+ String getTmpStatsDir();
+
+ String getStatsAggPrefix();
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
index 30d9912..c09589c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
@@ -35,7 +35,7 @@ public class LoadFileDesc extends LoadDesc implements Serializable {
// list of columns, comma separated
private String columns;
private String columnTypes;
- private String destinationCreateTable;
+ private transient CreateTableDesc ctasCreateTableDesc;
private boolean isMmCtas;
public LoadFileDesc(final LoadFileDesc o) {
@@ -45,23 +45,16 @@ public class LoadFileDesc extends LoadDesc implements Serializable {
this.isDfsDir = o.isDfsDir;
this.columns = o.columns;
this.columnTypes = o.columnTypes;
- this.destinationCreateTable = o.destinationCreateTable;
this.isMmCtas = o.isMmCtas;
+ this.ctasCreateTableDesc = o.ctasCreateTableDesc;
}
public LoadFileDesc(final CreateTableDesc createTableDesc, final CreateViewDesc createViewDesc,
final Path sourcePath, final Path targetDir, final boolean isDfsDir,
- final String columns, final String columnTypes, AcidUtils.Operation writeType, boolean isMmCtas) {
+ final String columns, final String columnTypes, AcidUtils.Operation writeType, boolean isMmCtas) {
this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType, isMmCtas);
- if (createTableDesc != null && createTableDesc.getDatabaseName() != null
- && createTableDesc.getTableName() != null) {
- destinationCreateTable = (createTableDesc.getTableName().contains(".") ? "" : createTableDesc
- .getDatabaseName() + ".")
- + createTableDesc.getTableName();
- } else if (createViewDesc != null) {
- // The work is already done in analyzeCreateView to assure that the view name is fully
- // qualified.
- destinationCreateTable = createViewDesc.getViewName();
+ if (createTableDesc != null && createTableDesc.isCTAS()) {
+ ctasCreateTableDesc = createTableDesc;
}
}
@@ -131,11 +124,8 @@ public class LoadFileDesc extends LoadDesc implements Serializable {
this.columnTypes = columnTypes;
}
- /**
- * @return the destinationCreateTable
- */
- public String getDestinationCreateTable(){
- return destinationCreateTable;
+ public CreateTableDesc getCtasCreateTableDesc() {
+ return ctasCreateTableDesc;
}
public boolean isMmCtas() {
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsNoJobWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsNoJobWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsNoJobWork.java
deleted file mode 100644
index 77c04f6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsNoJobWork.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.plan;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
-import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
-import org.apache.hadoop.hive.ql.plan.Explain.Level;
-
-
-
-/**
- * Client-side stats aggregator task.
- */
-@Explain(displayName = "Stats-Aggr Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class StatsNoJobWork implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private TableSpec tableSpecs;
- private boolean statsReliable;
- private PrunedPartitionList prunedPartitionList;
-
- public StatsNoJobWork() {
- }
-
- public StatsNoJobWork(TableSpec tableSpecs) {
- this.tableSpecs = tableSpecs;
- }
-
- public StatsNoJobWork(boolean statsReliable) {
- this.statsReliable = statsReliable;
- }
-
- public TableSpec getTableSpecs() {
- return tableSpecs;
- }
-
- public boolean isStatsReliable() {
- return statsReliable;
- }
-
- public void setStatsReliable(boolean statsReliable) {
- this.statsReliable = statsReliable;
- }
-
- public void setPrunedPartitionList(PrunedPartitionList prunedPartitionList) {
- this.prunedPartitionList = prunedPartitionList;
- }
-
- public PrunedPartitionList getPrunedPartitionList() {
- return prunedPartitionList;
- }
-}