You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/11/07 22:33:56 UTC
[19/22] hive git commit: HIVE-16827 : Merge stats task and column
stats task into a single task (Zoltan Haindrich via Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
index 74629d5..b2bd465 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
@@ -19,136 +19,153 @@
package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
-
+import org.apache.hadoop.hive.ql.session.SessionState;
/**
- * ConditionalStats.
+ * Stats Work, may include basic stats work and column stats desc
*
*/
-@Explain(displayName = "Stats-Aggr Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+@Explain(displayName = "Stats Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
public class StatsWork implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private TableSpec tableSpecs; // source table spec -- for TableScanOperator
- private LoadTableDesc loadTableDesc; // same as MoveWork.loadTableDesc -- for FileSinkOperator
- private LoadFileDesc loadFileDesc; // same as MoveWork.loadFileDesc -- for FileSinkOperator
- private String aggKey; // aggregation key prefix
- private boolean statsReliable; // are stats completely reliable
-
- // If stats aggregator is not present, clear the current aggregator stats.
- // For eg. if a merge is being performed, stats already collected by aggregator (numrows etc.)
- // are still valid. However, if a load file is being performed, the old stats collected by
- // aggregator are not valid. It might be a good idea to clear them instead of leaving wrong
- // and old stats.
- // Since HIVE-12661, we maintain the old stats (although may be wrong) for CBO
- // purpose. We use a flag COLUMN_STATS_ACCURATE to
- // show the accuracy of the stats.
- private boolean clearAggregatorStats = false;
-
- private boolean noStatsAggregator = false;
+ private static final long serialVersionUID = 1L;
+ // this is for basic stats
+ private BasicStatsWork basicStatsWork;
+ private BasicStatsNoJobWork basicStatsNoJobWork;
+ private ColumnStatsDesc colStats;
+ private static final int LIMIT = -1;
- private boolean isNoScanAnalyzeCommand = false;
+ private String currentDatabase;
+ private boolean statsReliable;
+ private Table table;
+ private boolean truncate;
+ private boolean footerScan;
+ private Set<Partition> partitions = new HashSet<>();
- // sourceTask for TS is not changed (currently) but that of FS might be changed
- // by various optimizers (auto.convert.join, for example)
- // so this is set by DriverContext in runtime
- private transient Task sourceTask;
+ public StatsWork(Table table, BasicStatsWork basicStatsWork, HiveConf hconf) {
+ super();
+ this.table = table;
+ this.basicStatsWork = basicStatsWork;
+ this.currentDatabase = SessionState.get().getCurrentDatabase();
+ statsReliable = hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE);
+ basicStatsWork.setStatsReliable(statsReliable);
+ }
- // used by FS based stats collector
- private String statsTmpDir;
+ public StatsWork(Table table, HiveConf hconf) {
+ super();
+ this.table = table;
+ this.currentDatabase = SessionState.get().getCurrentDatabase();
+ statsReliable = hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE);
+ }
- public StatsWork() {
+ @Override
+ public String toString() {
+ return String.format("StatWork; fetch: %s", getfWork());
}
- public StatsWork(TableSpec tableSpecs) {
- this.tableSpecs = tableSpecs;
+ FetchWork getfWork() {
+ return colStats == null ? null : colStats.getFWork();
}
- public StatsWork(LoadTableDesc loadTableDesc) {
- this.loadTableDesc = loadTableDesc;
+ @Explain(displayName = "Column Stats Desc")
+ public ColumnStatsDesc getColStats() {
+ return colStats;
}
- public StatsWork(LoadFileDesc loadFileDesc) {
- this.loadFileDesc = loadFileDesc;
+ public void setColStats(ColumnStatsDesc colStats) {
+ this.colStats = colStats;
}
- public TableSpec getTableSpecs() {
- return tableSpecs;
+ // unused / unknown reason
+ @Deprecated
+ public static int getLimit() {
+ return LIMIT;
}
- public LoadTableDesc getLoadTableDesc() {
- return loadTableDesc;
+ @Explain(displayName = "Basic Stats Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public BasicStatsWork getBasicStatsWork() {
+ return basicStatsWork;
}
- public LoadFileDesc getLoadFileDesc() {
- return loadFileDesc;
+ // only explain uses it
+ @Explain(displayName = "Basic Stats NoJob Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public BasicStatsNoJobWork getBasicStatsNoJobWork() {
+ return basicStatsNoJobWork;
}
- public void setAggKey(String aggK) {
- aggKey = aggK;
+ public void setSourceTask(Task<?> sourceTask) {
+ basicStatsWork.setSourceTask(sourceTask);
}
- @Explain(displayName = "Stats Aggregation Key Prefix", explainLevels = { Level.EXTENDED })
- public String getAggKey() {
- return aggKey;
+ public String getCurrentDatabaseName() {
+ return currentDatabase;
}
- public String getStatsTmpDir() {
- return statsTmpDir;
+ public boolean hasColStats() {
+ return colStats != null;
}
- public void setStatsTmpDir(String statsTmpDir) {
- this.statsTmpDir = statsTmpDir;
+ public Table getTable() {
+ return table;
}
- public boolean getNoStatsAggregator() {
- return noStatsAggregator;
+ public void collectStatsFromAggregator(IStatsGatherDesc conf) {
+ // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
+ // in FileSinkDesc is used for stats publishing. They should be consistent.
+ basicStatsWork.setAggKey(conf.getStatsAggPrefix());
+ basicStatsWork.setStatsTmpDir(conf.getTmpStatsDir());
+ basicStatsWork.setStatsReliable(statsReliable);
}
- public void setNoStatsAggregator(boolean noStatsAggregator) {
- this.noStatsAggregator = noStatsAggregator;
+ public void truncateExisting(boolean truncate) {
+ this.truncate = truncate;
}
- public boolean isStatsReliable() {
- return statsReliable;
+
+ public void setFooterScan() {
+ basicStatsNoJobWork = new BasicStatsNoJobWork(table.getTableSpec());
+ basicStatsNoJobWork.setStatsReliable(getStatsReliable());
+ footerScan = true;
}
- public void setStatsReliable(boolean statsReliable) {
- this.statsReliable = statsReliable;
+ public void addInputPartitions(Set<Partition> partitions) {
+ this.partitions.addAll(partitions);
}
- public boolean isClearAggregatorStats() {
- return clearAggregatorStats;
+ public Set<Partition> getPartitions() {
+ return partitions;
}
- public void setClearAggregatorStats(boolean clearAggregatorStats) {
- this.clearAggregatorStats = clearAggregatorStats;
+ public boolean isFooterScan() {
+ return footerScan;
}
- /**
- * @return the isNoScanAnalyzeCommand
- */
- public boolean isNoScanAnalyzeCommand() {
- return isNoScanAnalyzeCommand;
+ public boolean getStatsReliable() {
+ return statsReliable;
}
- /**
- * @param isNoScanAnalyzeCommand the isNoScanAnalyzeCommand to set
- */
- public void setNoScanAnalyzeCommand(boolean isNoScanAnalyzeCommand) {
- this.isNoScanAnalyzeCommand = isNoScanAnalyzeCommand;
+ public String getFullTableName() {
+ return table.getDbName() + "." + table.getTableName();
}
public Task getSourceTask() {
- return sourceTask;
+ return basicStatsWork == null ? null : basicStatsWork.getSourceTask();
+ }
+
+ public String getAggKey() {
+ return basicStatsWork.getAggKey();
}
- public void setSourceTask(Task sourceTask) {
- this.sourceTask = sourceTask;
+ public boolean isAggregating() {
+ return basicStatsWork != null && basicStatsWork.getAggKey() != null;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 75d0f43..237c8cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
* things will be added here as table scan is invoked as part of local work.
**/
@Explain(displayName = "TableScan", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class TableScanDesc extends AbstractOperatorDesc {
+public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherDesc {
private static final long serialVersionUID = 1L;
private String alias;
@@ -263,11 +263,13 @@ public class TableScanDesc extends AbstractOperatorDesc {
this.gatherStats = gatherStats;
}
+ @Override
@Explain(displayName = "GatherStats", explainLevels = { Level.EXTENDED })
public boolean isGatherStats() {
return gatherStats;
}
+ @Override
public String getTmpStatsDir() {
return tmpStatsDir;
}
@@ -296,6 +298,7 @@ public class TableScanDesc extends AbstractOperatorDesc {
statsAggKeyPrefix = k;
}
+ @Override
@Explain(displayName = "Statistics Aggregation Key Prefix", explainLevels = { Level.EXTENDED })
public String getStatsAggPrefix() {
return statsAggKeyPrefix;
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
new file mode 100644
index 0000000..d1f7652
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+import org.apache.hadoop.hive.ql.plan.BasicStatsNoJobWork;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
+
+/**
+ * StatsNoJobTask is used in cases where stats collection is the only task for the given query (no
+ * parent MR or Tez job). It is used in the following cases 1) ANALYZE with noscan for
+ * file formats that implement StatsProvidingRecordReader interface: ORC format (implements
+ * StatsProvidingRecordReader) stores column statistics for all columns in the file footer. Its much
+ * faster to compute the table/partition statistics by reading the footer than scanning all the
+ * rows. This task can be used for computing basic stats like numFiles, numRows, fileSize,
+ * rawDataSize from ORC footer.
+ **/
+public class BasicStatsNoJobTask implements IStatsProcessor {
+
+ private static transient final Logger LOG = LoggerFactory.getLogger(BasicStatsNoJobTask.class);
+ private HiveConf conf;
+
+ private BasicStatsNoJobWork work;
+ private LogHelper console;
+
+ public BasicStatsNoJobTask(HiveConf conf, BasicStatsNoJobWork work) {
+ this.conf = conf;
+ this.work = work;
+ console = new LogHelper(LOG);
+ }
+
+
+ @Override
+ public void initialize(CompilationOpContext opContext) {
+
+ }
+
+ @Override
+ public int process(Hive db, Table tbl) throws Exception {
+
+ LOG.info("Executing stats (no job) task");
+
+ ExecutorService threadPool = StatsTask.newThreadPool(conf);
+
+ return aggregateStats(threadPool, db);
+ }
+
+ public StageType getType() {
+ return StageType.STATS;
+ }
+
+ public String getName() {
+ return "STATS-NO-JOB";
+ }
+
+ static class StatItem {
+ Partish partish;
+ Map<String, String> params;
+ Object result;
+ }
+
+ static class FooterStatCollector implements Runnable {
+
+ private Partish partish;
+ private Object result;
+ private JobConf jc;
+ private Path dir;
+ private FileSystem fs;
+ private LogHelper console;
+
+ public FooterStatCollector(JobConf jc, Partish partish) {
+ this.jc = jc;
+ this.partish = partish;
+ }
+
+ public static final Function<FooterStatCollector, String> SIMPLE_NAME_FUNCTION = new Function<FooterStatCollector, String>() {
+
+ @Override
+ public String apply(FooterStatCollector sc) {
+ return String.format("%s#%s", sc.partish.getTable().getCompleteName(), sc.partish.getPartishType());
+ }
+ };
+ private static final Function<FooterStatCollector, Partition> EXTRACT_RESULT_FUNCTION = new Function<FooterStatCollector, Partition>() {
+ @Override
+ public Partition apply(FooterStatCollector input) {
+ return (Partition) input.result;
+ }
+ };
+
+ private boolean isValid() {
+ return result != null;
+ }
+
+ public void init(HiveConf conf, LogHelper console) throws IOException {
+ this.console = console;
+ dir = new Path(partish.getPartSd().getLocation());
+ fs = dir.getFileSystem(conf);
+ }
+
+ @Override
+ public void run() {
+
+ Map<String, String> parameters = partish.getPartParameters();
+ try {
+ long numRows = 0;
+ long rawDataSize = 0;
+ long fileSize = 0;
+ long numFiles = 0;
+ LOG.debug("Aggregating stats for {}", dir);
+ FileStatus[] fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs);
+
+ for (FileStatus file : fileList) {
+ LOG.debug("Computing stats for {}", file);
+ if (!file.isDirectory()) {
+ InputFormat<?, ?> inputFormat = ReflectionUtil.newInstance(partish.getInputFormatClass(), jc);
+ InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0, new String[] { partish.getLocation() });
+ if (file.getLen() == 0) {
+ numFiles += 1;
+ } else {
+ org.apache.hadoop.mapred.RecordReader<?, ?> recordReader = inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
+ try {
+ if (recordReader instanceof StatsProvidingRecordReader) {
+ StatsProvidingRecordReader statsRR;
+ statsRR = (StatsProvidingRecordReader) recordReader;
+ rawDataSize += statsRR.getStats().getRawDataSize();
+ numRows += statsRR.getStats().getRowCount();
+ fileSize += file.getLen();
+ numFiles += 1;
+ } else {
+ throw new HiveException(String.format("Unexpected file found during reading footers for: %s ", file));
+ }
+ } finally {
+ recordReader.close();
+ }
+ }
+ }
+ }
+
+ StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
+
+ parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
+ parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(rawDataSize));
+ parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(fileSize));
+ parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(numFiles));
+
+ if (partish.getPartition() != null) {
+ result = new Partition(partish.getTable(), partish.getPartition().getTPartition());
+ } else {
+ result = new Table(partish.getTable().getTTable());
+ }
+
+ String msg = partish.getSimpleName() + " stats: [" + toString(parameters) + ']';
+ LOG.debug(msg);
+ console.printInfo(msg);
+
+ } catch (Exception e) {
+ console.printInfo("[Warning] could not update stats for " + partish.getSimpleName() + ".", "Failed with exception " + e.getMessage() + "\n" + StringUtils.stringifyException(e));
+ }
+ }
+
+ private String toString(Map<String, String> parameters) {
+ StringBuilder builder = new StringBuilder();
+ for (String statType : StatsSetupConst.supportedStats) {
+ String value = parameters.get(statType);
+ if (value != null) {
+ if (builder.length() > 0) {
+ builder.append(", ");
+ }
+ builder.append(statType).append('=').append(value);
+ }
+ }
+ return builder.toString();
+ }
+
+ }
+
+ private int aggregateStats(ExecutorService threadPool, Hive db) {
+ int ret = 0;
+ try {
+ JobConf jc = new JobConf(conf);
+
+ TableSpec tableSpecs = work.getTableSpecs();
+
+ if (tableSpecs == null) {
+ throw new RuntimeException("this is unexpected...needs some investigation");
+ }
+
+ Table table = tableSpecs.tableHandle;
+
+ Collection<Partition> partitions = null;
+ if (work.getPartitions() == null || work.getPartitions().isEmpty()) {
+ if (table.isPartitioned()) {
+ partitions = tableSpecs.partitions;
+ }
+ } else {
+ partitions = work.getPartitions();
+ }
+
+ LinkedList<Partish> partishes = Lists.newLinkedList();
+ if (partitions == null) {
+ partishes.add(Partish.buildFor(table));
+ } else {
+ for (Partition part : partitions) {
+ partishes.add(Partish.buildFor(table, part));
+ }
+ }
+
+ List<FooterStatCollector> scs = Lists.newArrayList();
+ for (Partish partish : partishes) {
+ scs.add(new FooterStatCollector(jc, partish));
+ }
+
+ for (FooterStatCollector sc : scs) {
+ sc.init(conf, console);
+ threadPool.execute(sc);
+ }
+
+ LOG.debug("Stats collection waiting for threadpool to shutdown..");
+ shutdownAndAwaitTermination(threadPool);
+ LOG.debug("Stats collection threadpool shutdown successful.");
+
+ ret = updatePartitions(db, scs, table);
+
+ } catch (Exception e) {
+ console.printError("Failed to collect footer statistics.", "Failed with exception " + e.getMessage() + "\n" + StringUtils.stringifyException(e));
+ // Fail the query if the stats are supposed to be reliable
+ if (work.isStatsReliable()) {
+ ret = -1;
+ }
+ }
+
+ // The return value of 0 indicates success,
+ // anything else indicates failure
+ return ret;
+ }
+
+ private int updatePartitions(Hive db, List<FooterStatCollector> scs, Table table) throws InvalidOperationException, HiveException {
+
+ String tableFullName = table.getFullyQualifiedName();
+
+ if (scs.isEmpty()) {
+ return 0;
+ }
+ if (work.isStatsReliable()) {
+ for (FooterStatCollector statsCollection : scs) {
+ if (statsCollection.result == null) {
+ LOG.debug("Stats requested to be reliable. Empty stats found: {}", statsCollection.partish.getSimpleName());
+ return -1;
+ }
+ }
+ }
+ List<FooterStatCollector> validColectors = Lists.newArrayList();
+ for (FooterStatCollector statsCollection : scs) {
+ if (statsCollection.isValid()) {
+ validColectors.add(statsCollection);
+ }
+ }
+
+ EnvironmentContext environmentContext = new EnvironmentContext();
+ environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
+
+ ImmutableListMultimap<String, FooterStatCollector> collectorsByTable = Multimaps.index(validColectors, FooterStatCollector.SIMPLE_NAME_FUNCTION);
+
+ LOG.debug("Collectors.size(): {}", collectorsByTable.keySet());
+
+ if (collectorsByTable.keySet().size() < 1) {
+ LOG.warn("Collectors are empty! ; {}", tableFullName);
+ }
+
+ // for now this should be true...
+ assert (collectorsByTable.keySet().size() <= 1);
+
+ LOG.debug("Updating stats for: {}", tableFullName);
+
+ for (String partName : collectorsByTable.keySet()) {
+ ImmutableList<FooterStatCollector> values = collectorsByTable.get(partName);
+
+ if (values == null) {
+ throw new RuntimeException("very intresting");
+ }
+
+ if (values.get(0).result instanceof Table) {
+ db.alterTable(tableFullName, (Table) values.get(0).result, environmentContext);
+ LOG.debug("Updated stats for {}.", tableFullName);
+ } else {
+ if (values.get(0).result instanceof Partition) {
+ List<Partition> results = Lists.transform(values, FooterStatCollector.EXTRACT_RESULT_FUNCTION);
+ db.alterPartitions(tableFullName, results, environmentContext);
+ LOG.debug("Bulk updated {} partitions of {}.", results.size(), tableFullName);
+ } else {
+ throw new RuntimeException("inconsistent");
+ }
+ }
+ }
+ LOG.debug("Updated stats for: {}", tableFullName);
+ return 0;
+ }
+
+ private void shutdownAndAwaitTermination(ExecutorService threadPool) {
+
+ // Disable new tasks from being submitted
+ threadPool.shutdown();
+ try {
+
+ // Wait a while for existing tasks to terminate
+ // XXX this will wait forever... :)
+ while (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.debug("Waiting for all stats tasks to finish...");
+ }
+ // Cancel currently executing tasks
+ threadPool.shutdownNow();
+
+ // Wait a while for tasks to respond to being cancelled
+ if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) {
+ LOG.debug("Stats collection thread pool did not terminate");
+ }
+ } catch (InterruptedException ie) {
+
+ // Cancel again if current thread also interrupted
+ threadPool.shutdownNow();
+
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void setDpPartSpecs(Collection<Partition> dpPartSpecs) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java
new file mode 100644
index 0000000..ecf3b9d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+import org.apache.hadoop.hive.ql.plan.BasicStatsWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * StatsTask implementation. StatsTask mainly deals with "collectable" stats. These are
+ * stats that require data scanning and are collected during query execution (unless the user
+ * explicitly requests data scanning just for the purpose of stats computation using the "ANALYZE"
+ * command. All other stats are computed directly by the MetaStore. The rationale being that the
+ * MetaStore layer covers all Thrift calls and provides better guarantees about the accuracy of
+ * those stats.
+ **/
+public class BasicStatsTask implements Serializable, IStatsProcessor {
+
+ private static final long serialVersionUID = 1L;
+ private static transient final Logger LOG = LoggerFactory.getLogger(BasicStatsTask.class);
+
+ private Table table;
+ private Collection<Partition> dpPartSpecs;
+ public boolean followedColStats;
+ private BasicStatsWork work;
+ private HiveConf conf;
+
+ protected transient LogHelper console;
+
+ public BasicStatsTask(HiveConf conf, BasicStatsWork work) {
+ super();
+ dpPartSpecs = null;
+ this.conf = conf;
+ console = new LogHelper(LOG);
+ this.work = work;
+ }
+
+ @Override
+ public int process(Hive db, Table tbl) throws Exception {
+
+ LOG.info("Executing stats task");
+ table = tbl;
+ return aggregateStats(db);
+ }
+
+ @Override
+ public void initialize(CompilationOpContext opContext) {
+ }
+
+ public StageType getType() {
+ return StageType.STATS;
+ }
+
+ public String getName() {
+ return "STATS";
+ }
+
+ private static class BasicStatsProcessor {
+
+ private Partish partish;
+ private FileStatus[] partfileStatus;
+ private BasicStatsWork work;
+ private boolean atomic;
+ private boolean followedColStats1;
+
+ public BasicStatsProcessor(Partish partish, BasicStatsWork work, HiveConf conf, boolean followedColStats2) {
+ this.partish = partish;
+ this.work = work;
+ atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
+ followedColStats1 = followedColStats2;
+ }
+
+ public Object process(StatsAggregator statsAggregator) throws HiveException, MetaException {
+ Partish p = partish;
+ Map<String, String> parameters = p.getPartParameters();
+ if (p.isAcid()) {
+ StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
+ }
+
+ if (work.isTargetRewritten()) {
+ StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
+ }
+
+ // work.getTableSpecs() == null means it is not analyze command
+ // and then if it is not followed by column stats, we should clean
+ // column stats
+ // FIXME: move this to ColStat related part
+ if (!work.isExplicitAnalyze() && !followedColStats1) {
+ StatsSetupConst.clearColumnStatsState(parameters);
+ }
+ // non-partitioned tables:
+ // XXX: I don't aggree with this logic
+ // FIXME: deprecate atomic? what's its purpose?
+ if (!existStats(parameters) && atomic) {
+ return null;
+ }
+ if(partfileStatus == null){
+ LOG.warn("Partition/partfiles is null for: " + partish.getPartition().getSpec());
+ return null;
+ }
+
+ // The collectable stats for the aggregator needs to be cleared.
+ // For eg. if a file is being loaded, the old number of rows are not valid
+ // XXX: makes no sense for me... possibly not needed anymore
+ if (work.isClearAggregatorStats()) {
+ // we choose to keep the invalid stats and only change the setting.
+ StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
+ }
+
+ updateQuickStats(parameters, partfileStatus);
+ if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
+ if (statsAggregator != null) {
+ String prefix = getAggregationPrefix(p.getTable(), p.getPartition());
+ updateStats(statsAggregator, parameters, prefix, atomic);
+ }
+ }
+
+ return p.getOutput();
+ }
+
+ public void collectFileStatus(Warehouse wh) throws MetaException {
+ Map<String, String> parameters = partish.getPartParameters();
+ if (!existStats(parameters) && atomic) {
+ return;
+ }
+ partfileStatus = wh.getFileStatusesForSD(partish.getPartSd());
+ }
+
+ @Deprecated
+ private boolean existStats(Map<String, String> parameters) {
+ return parameters.containsKey(StatsSetupConst.ROW_COUNT)
+ || parameters.containsKey(StatsSetupConst.NUM_FILES)
+ || parameters.containsKey(StatsSetupConst.TOTAL_SIZE)
+ || parameters.containsKey(StatsSetupConst.RAW_DATA_SIZE)
+ || parameters.containsKey(StatsSetupConst.NUM_PARTITIONS);
+ }
+
+ private void updateQuickStats(Map<String, String> parameters, FileStatus[] partfileStatus) throws MetaException {
+ MetaStoreUtils.populateQuickStats(partfileStatus, parameters);
+ }
+
+ private String getAggregationPrefix(Table table, Partition partition) throws MetaException {
+ String prefix = getAggregationPrefix0(table, partition);
+ String aggKey = prefix.endsWith(Path.SEPARATOR) ? prefix : prefix + Path.SEPARATOR;
+ return aggKey;
+ }
+
+ private String getAggregationPrefix0(Table table, Partition partition) throws MetaException {
+
+ // prefix is of the form dbName.tblName
+ String prefix = table.getDbName() + "." + org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.encodeTableName(table.getTableName());
+ // FIXME: this is a secret contract; reusein getAggrKey() creates a more closer relation to the StatsGatherer
+ // prefix = work.getAggKey();
+ prefix = prefix.toLowerCase();
+ if (partition != null) {
+ return Utilities.join(prefix, Warehouse.makePartPath(partition.getSpec()));
+ }
+ return prefix;
+ }
+
+ private void updateStats(StatsAggregator statsAggregator, Map<String, String> parameters, String aggKey, boolean atomic) throws HiveException {
+
+ for (String statType : StatsSetupConst.statsRequireCompute) {
+ String value = statsAggregator.aggregateStats(aggKey, statType);
+ if (value != null && !value.isEmpty()) {
+ long longValue = Long.parseLong(value);
+
+ if (!work.isTargetRewritten()) {
+ String originalValue = parameters.get(statType);
+ if (originalValue != null) {
+ longValue += Long.parseLong(originalValue); // todo: invalid + valid = invalid
+ }
+ }
+ parameters.put(statType, String.valueOf(longValue));
+ } else {
+ if (atomic) {
+ throw new HiveException(ErrorMsg.STATSAGGREGATOR_MISSED_SOMESTATS, statType);
+ }
+ }
+ }
+ }
+
+ }
+
+
+ private int aggregateStats(Hive db) {
+
+ StatsAggregator statsAggregator = null;
+ int ret = 0;
+ StatsCollectionContext scc = null;
+ EnvironmentContext environmentContext = null;
+ environmentContext = new EnvironmentContext();
+ environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
+
+ try {
+ // Stats setup:
+ final Warehouse wh = new Warehouse(conf);
+ if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {
+ try {
+ scc = getContext();
+ statsAggregator = createStatsAggregator(scc, conf);
+ } catch (HiveException e) {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
+ throw e;
+ }
+ console.printError(ErrorMsg.STATS_SKIPPING_BY_ERROR.getErrorCodedMsg(e.toString()));
+ }
+ }
+
+ List<Partition> partitions = getPartitionsList(db);
+
+ String tableFullName = table.getDbName() + "." + table.getTableName();
+
+ List<Partish> partishes = new ArrayList<>();
+
+ if (partitions == null) {
+ Partish p;
+ partishes.add(p = new Partish.PTable(table));
+
+ BasicStatsProcessor basicStatsProcessor = new BasicStatsProcessor(p, work, conf, followedColStats);
+ basicStatsProcessor.collectFileStatus(wh);
+ Object res = basicStatsProcessor.process(statsAggregator);
+
+ if (res == null) {
+ return 0;
+ }
+ db.alterTable(tableFullName, (Table) res, environmentContext);
+
+ if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
+ console.printInfo("Table " + tableFullName + " stats: [" + toString(p.getPartParameters()) + ']');
+ }
+ LOG.info("Table " + tableFullName + " stats: [" + toString(p.getPartParameters()) + ']');
+
+ } else {
+ // Partitioned table:
+ // Need to get the old stats of the partition
+ // and update the table stats based on the old and new stats.
+
+ List<Partition> updates = new ArrayList<Partition>();
+
+ final ExecutorService pool = buildBasicStatsExecutor();
+
+ final List<Future<Void>> futures = Lists.newLinkedList();
+ List<BasicStatsProcessor> processors = Lists.newLinkedList();
+
+ try {
+ for(final Partition partn : partitions) {
+ Partish p;
+ BasicStatsProcessor bsp = new BasicStatsProcessor(p = new Partish.PPart(table, partn), work, conf, followedColStats);
+ processors.add(bsp);
+
+ futures.add(pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ bsp.collectFileStatus(wh);
+ return null;
+ }
+ }));
+ }
+ pool.shutdown();
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ } catch (InterruptedException e) {
+ LOG.debug("Cancelling " + futures.size() + " file stats lookup tasks");
+ //cancel other futures
+ for (Future future : futures) {
+ future.cancel(true);
+ }
+ // Fail the query if the stats are supposed to be reliable
+ if (work.isStatsReliable()) {
+ ret = 1;
+ }
+ } finally {
+ if (pool != null) {
+ pool.shutdownNow();
+ }
+ LOG.debug("Finished getting file stats of all partitions!");
+ }
+
+ for (BasicStatsProcessor basicStatsProcessor : processors) {
+ Object res = basicStatsProcessor.process(statsAggregator);
+ if (res == null) {
+ LOG.info("Partition " + basicStatsProcessor.partish.getPartition().getSpec() + " stats: [0]");
+ continue;
+ }
+ updates.add((Partition) res);
+ if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
+ console.printInfo("Partition " + basicStatsProcessor.partish.getPartition().getSpec() + " stats: [" + toString(basicStatsProcessor.partish.getPartParameters()) + ']');
+ }
+ LOG.info("Partition " + basicStatsProcessor.partish.getPartition().getSpec() + " stats: [" + toString(basicStatsProcessor.partish.getPartParameters()) + ']');
+ }
+
+ if (!updates.isEmpty()) {
+ db.alterPartitions(tableFullName, updates, environmentContext);
+ }
+ if (work.isStatsReliable() && updates.size() != processors.size()) {
+ LOG.info("Stats should be reliadble...however seems like there were some issue.. => ret 1");
+ ret = 1;
+ }
+ }
+
+ } catch (Exception e) {
+ console.printInfo("[Warning] could not update stats.",
+ "Failed with exception " + e.getMessage() + "\n"
+ + StringUtils.stringifyException(e));
+
+ // Fail the query if the stats are supposed to be reliable
+ if (work.isStatsReliable()) {
+ ret = 1;
+ }
+ } finally {
+ if (statsAggregator != null) {
+ statsAggregator.closeConnection(scc);
+ }
+ }
+ // The return value of 0 indicates success,
+ // anything else indicates failure
+ return ret;
+ }
+
+ private BasicStatsWork getWork() {
+ return work;
+ }
+
+ private ExecutorService buildBasicStatsExecutor() {
+ //Get the file status up-front for all partitions. Beneficial in cases of blob storage systems
+ int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 1);
+ // In case thread count is set to 0, use single thread.
+ poolSize = Math.max(poolSize, 1);
+ final ExecutorService pool = Executors.newFixedThreadPool(poolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("stats-updater-thread-%d").build());
+ LOG.debug("Getting file stats of all partitions. threadpool size:" + poolSize);
+ return pool;
+ }
+
+ private StatsAggregator createStatsAggregator(StatsCollectionContext scc, HiveConf conf) throws HiveException {
+ String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+ StatsFactory factory = StatsFactory.newFactory(statsImpl, conf);
+ if (factory == null) {
+ throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg());
+ }
+ // initialize stats publishing table for noscan which has only stats task
+ // the rest of MR task following stats task initializes it in ExecDriver.java
+ StatsPublisher statsPublisher = factory.getStatsPublisher();
+ if (!statsPublisher.init(scc)) { // creating stats table if not exists
+ throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
+ }
+
+ // manufacture a StatsAggregator
+ StatsAggregator statsAggregator = factory.getStatsAggregator();
+ if (!statsAggregator.connect(scc)) {
+ throw new HiveException(ErrorMsg.STATSAGGREGATOR_CONNECTION_ERROR.getErrorCodedMsg(statsImpl));
+ }
+ return statsAggregator;
+ }
+
+ private StatsCollectionContext getContext() throws HiveException {
+
+ StatsCollectionContext scc = new StatsCollectionContext(conf);
+ Task sourceTask = getWork().getSourceTask();
+ if (sourceTask == null) {
+ throw new HiveException(ErrorMsg.STATSAGGREGATOR_SOURCETASK_NULL.getErrorCodedMsg());
+ }
+ scc.setTask(sourceTask);
+ scc.setStatsTmpDir(this.getWork().getStatsTmpDir());
+ return scc;
+ }
+
+
+ private String toString(Map<String, String> parameters) {
+ StringBuilder builder = new StringBuilder();
+ for (String statType : StatsSetupConst.supportedStats) {
+ String value = parameters.get(statType);
+ if (value != null) {
+ if (builder.length() > 0) {
+ builder.append(", ");
+ }
+ builder.append(statType).append('=').append(value);
+ }
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Get the list of partitions that need to update statistics.
+ * TODO: we should reuse the Partitions generated at compile time
+ * since getting the list of partitions is quite expensive.
+ *
+ * @return a list of partitions that need to update statistics.
+ * @throws HiveException
+ */
+ private List<Partition> getPartitionsList(Hive db) throws HiveException {
+ if (work.getLoadFileDesc() != null) {
+ return null; //we are in CTAS, so we know there are no partitions
+ }
+
+ List<Partition> list = new ArrayList<Partition>();
+
+ if (work.getTableSpecs() != null) {
+
+ // ANALYZE command
+ TableSpec tblSpec = work.getTableSpecs();
+ table = tblSpec.tableHandle;
+ if (!table.isPartitioned()) {
+ return null;
+ }
+ // get all partitions that matches with the partition spec
+ List<Partition> partitions = tblSpec.partitions;
+ if (partitions != null) {
+ for (Partition partn : partitions) {
+ list.add(partn);
+ }
+ }
+ } else if (work.getLoadTableDesc() != null) {
+
+ // INSERT OVERWRITE command
+ LoadTableDesc tbd = work.getLoadTableDesc();
+ table = db.getTable(tbd.getTable().getTableName());
+ if (!table.isPartitioned()) {
+ return null;
+ }
+ DynamicPartitionCtx dpCtx = tbd.getDPCtx();
+ if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
+ // If no dynamic partitions are generated, dpPartSpecs may not be initialized
+ if (dpPartSpecs != null) {
+ // load the list of DP partitions and return the list of partition specs
+ list.addAll(dpPartSpecs);
+ }
+ } else { // static partition
+ Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
+ list.add(partn);
+ }
+ }
+ return list;
+ }
+
+ public Collection<Partition> getDpPartSpecs() {
+ return dpPartSpecs;
+ }
+
+ @Override
+ public void setDpPartSpecs(Collection<Partition> dpPartSpecs) {
+ this.dpPartSpecs = dpPartSpecs;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
new file mode 100644
index 0000000..7ce7a74
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.FetchOperator;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.stats.ColumnStatisticsObjTranslator;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ColStatsProcessor implements IStatsProcessor {
+ private static transient final Logger LOG = LoggerFactory.getLogger(ColStatsProcessor.class);
+
+ private FetchOperator ftOp;
+ private FetchWork fWork;
+ private ColumnStatsDesc colStatDesc;
+ private HiveConf conf;
+ private boolean isStatsReliable;
+
+ public ColStatsProcessor(ColumnStatsDesc colStats, HiveConf conf) {
+ this.conf = conf;
+ fWork = colStats.getFWork();
+ colStatDesc = colStats;
+ isStatsReliable = conf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE);
+ }
+
+ @Override
+ public void initialize(CompilationOpContext opContext) {
+ try {
+ fWork.initializeForFetch(opContext);
+ JobConf job = new JobConf(conf);
+ ftOp = new FetchOperator(fWork, job);
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public int process(Hive db, Table tbl) throws Exception {
+ return persistColumnStats(db, tbl);
+ }
+
+ private List<ColumnStatistics> constructColumnStatsFromPackedRows(Table tbl1) throws HiveException, MetaException, IOException {
+
+ Table tbl = tbl1;
+
+ String partName = null;
+ List<String> colName = colStatDesc.getColName();
+ List<String> colType = colStatDesc.getColType();
+ boolean isTblLevel = colStatDesc.isTblLevel();
+
+ List<ColumnStatistics> stats = new ArrayList<ColumnStatistics>();
+ InspectableObject packedRow;
+ while ((packedRow = ftOp.getNextRow()) != null) {
+ if (packedRow.oi.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new HiveException("Unexpected object type encountered while unpacking row");
+ }
+
+ List<ColumnStatisticsObj> statsObjs = new ArrayList<ColumnStatisticsObj>();
+ StructObjectInspector soi = (StructObjectInspector) packedRow.oi;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ List<Object> list = soi.getStructFieldsDataAsList(packedRow.o);
+
+ List<FieldSchema> partColSchema = tbl.getPartCols();
+ // Partition columns are appended at end, we only care about stats column
+ int numOfStatCols = isTblLevel ? fields.size() : fields.size() - partColSchema.size();
+ assert list != null;
+ for (int i = 0; i < numOfStatCols; i++) {
+ StructField structField = fields.get(i);
+ String columnName = colName.get(i);
+ String columnType = colType.get(i);
+ Object values = list.get(i);
+ try {
+ ColumnStatisticsObj statObj = ColumnStatisticsObjTranslator.readHiveStruct(columnName, columnType, structField, values);
+ statsObjs.add(statObj);
+ } catch (Exception e) {
+ if (isStatsReliable) {
+ throw new HiveException("Statistics collection failed while (hive.stats.reliable)", e);
+ } else {
+ LOG.debug("Because {} is infinite or NaN, we skip stats.", columnName, e);
+ }
+ }
+ }
+
+ if (!statsObjs.isEmpty()) {
+
+ if (!isTblLevel) {
+ List<String> partVals = new ArrayList<String>();
+ // Iterate over partition columns to figure out partition name
+ for (int i = fields.size() - partColSchema.size(); i < fields.size(); i++) {
+ Object partVal = ((PrimitiveObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(list.get(i));
+ partVals.add(partVal == null ? // could be null for default partition
+ this.conf.getVar(ConfVars.DEFAULTPARTITIONNAME) : partVal.toString());
+ }
+ partName = Warehouse.makePartName(partColSchema, partVals);
+ }
+
+ ColumnStatisticsDesc statsDesc = buildColumnStatsDesc(tbl, partName, isTblLevel);
+ ColumnStatistics colStats = new ColumnStatistics();
+ colStats.setStatsDesc(statsDesc);
+ colStats.setStatsObj(statsObjs);
+ stats.add(colStats);
+ }
+ }
+ ftOp.clearFetchContext();
+ return stats;
+ }
+
+ private ColumnStatisticsDesc buildColumnStatsDesc(Table table, String partName, boolean isTblLevel) {
+ String dbName = table.getDbName();
+ assert dbName != null;
+ ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+ statsDesc.setDbName(dbName);
+ statsDesc.setTableName(table.getTableName());
+ statsDesc.setIsTblLevel(isTblLevel);
+
+ if (!isTblLevel) {
+ statsDesc.setPartName(partName);
+ } else {
+ statsDesc.setPartName(null);
+ }
+ return statsDesc;
+ }
+
+ public int persistColumnStats(Hive db, Table tbl) throws HiveException, MetaException, IOException {
+ // Construct a column statistics object from the result
+
+ List<ColumnStatistics> colStats = constructColumnStatsFromPackedRows(tbl);
+ // Persist the column statistics object to the metastore
+ // Note, this function is shared for both table and partition column stats.
+ if (colStats.isEmpty()) {
+ return 0;
+ }
+ SetPartitionsStatsRequest request = new SetPartitionsStatsRequest(colStats);
+ request.setNeedMerge(colStatDesc.isNeedMerge());
+ db.setPartitionColumnStatistics(request);
+ return 0;
+ }
+
+ @Override
+ public void setDpPartSpecs(Collection<Partition> dpPartSpecs) {
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/ColumnStatisticsObjTranslator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/ColumnStatisticsObjTranslator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColumnStatisticsObjTranslator.java
new file mode 100644
index 0000000..6485526
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColumnStatisticsObjTranslator.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
+import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector;
+import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector;
+import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
+import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+
+public class ColumnStatisticsObjTranslator {
+
+ public static ColumnStatisticsObj readHiveStruct(String columnName, String columnType, StructField structField, Object values)
+ throws HiveException
+ {
+ // Get the field objectInspector, fieldName and the field object.
+ ObjectInspector foi = structField.getFieldObjectInspector();
+ Object f = values;
+ String fieldName = structField.getFieldName();
+ ColumnStatisticsObj statsObj = new ColumnStatisticsObj();
+ statsObj.setColName(columnName);
+ statsObj.setColType(columnType);
+ try {
+ unpackStructObject(foi, f, fieldName, statsObj);
+ return statsObj;
+ } catch (Exception e) {
+ throw new HiveException("error calculating stats for column:" + structField.getFieldName(), e);
+ }
+ }
+
+ private static void unpackBooleanStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+ long v = ((LongObjectInspector) oi).get(o);
+ if (fName.equals("counttrues")) {
+ statsObj.getStatsData().getBooleanStats().setNumTrues(v);
+ } else if (fName.equals("countfalses")) {
+ statsObj.getStatsData().getBooleanStats().setNumFalses(v);
+ } else if (fName.equals("countnulls")) {
+ statsObj.getStatsData().getBooleanStats().setNumNulls(v);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ static class UnsupportedDoubleException extends Exception {
+ }
+
+ private static void unpackDoubleStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
+ if (fName.equals("countnulls")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getDoubleStats().setNumNulls(v);
+ } else if (fName.equals("numdistinctvalues")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getDoubleStats().setNumDVs(v);
+ } else if (fName.equals("max")) {
+ double d = ((DoubleObjectInspector) oi).get(o);
+ if (Double.isInfinite(d) || Double.isNaN(d)) {
+ throw new UnsupportedDoubleException();
+ }
+ statsObj.getStatsData().getDoubleStats().setHighValue(d);
+ } else if (fName.equals("min")) {
+ double d = ((DoubleObjectInspector) oi).get(o);
+ if (Double.isInfinite(d) || Double.isNaN(d)) {
+ throw new UnsupportedDoubleException();
+ }
+ statsObj.getStatsData().getDoubleStats().setLowValue(d);
+ } else if (fName.equals("ndvbitvector")) {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+ byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
+ statsObj.getStatsData().getDoubleStats().setBitVectors(buf);
+ ;
+ }
+ }
+
+ private static void unpackDecimalStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+ if (fName.equals("countnulls")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getDecimalStats().setNumNulls(v);
+ } else if (fName.equals("numdistinctvalues")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getDecimalStats().setNumDVs(v);
+ } else if (fName.equals("max")) {
+ HiveDecimal d = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
+ statsObj.getStatsData().getDecimalStats().setHighValue(convertToThriftDecimal(d));
+ } else if (fName.equals("min")) {
+ HiveDecimal d = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
+ statsObj.getStatsData().getDecimalStats().setLowValue(convertToThriftDecimal(d));
+ } else if (fName.equals("ndvbitvector")) {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+ byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
+ statsObj.getStatsData().getDecimalStats().setBitVectors(buf);
+ ;
+ }
+ }
+
+ private static Decimal convertToThriftDecimal(HiveDecimal d) {
+ return new Decimal(ByteBuffer.wrap(d.unscaledValue().toByteArray()), (short) d.scale());
+ }
+
+ private static void unpackLongStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+ if (fName.equals("countnulls")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getLongStats().setNumNulls(v);
+ } else if (fName.equals("numdistinctvalues")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getLongStats().setNumDVs(v);
+ } else if (fName.equals("max")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getLongStats().setHighValue(v);
+ } else if (fName.equals("min")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getLongStats().setLowValue(v);
+ } else if (fName.equals("ndvbitvector")) {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+ byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
+ statsObj.getStatsData().getLongStats().setBitVectors(buf);
+ ;
+ }
+ }
+
+ private static void unpackStringStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+ if (fName.equals("countnulls")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getStringStats().setNumNulls(v);
+ } else if (fName.equals("numdistinctvalues")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getStringStats().setNumDVs(v);
+ } else if (fName.equals("avglength")) {
+ double d = ((DoubleObjectInspector) oi).get(o);
+ statsObj.getStatsData().getStringStats().setAvgColLen(d);
+ } else if (fName.equals("maxlength")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getStringStats().setMaxColLen(v);
+ } else if (fName.equals("ndvbitvector")) {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+ byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
+ statsObj.getStatsData().getStringStats().setBitVectors(buf);
+ ;
+ }
+ }
+
+ private static void unpackBinaryStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+ if (fName.equals("countnulls")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getBinaryStats().setNumNulls(v);
+ } else if (fName.equals("avglength")) {
+ double d = ((DoubleObjectInspector) oi).get(o);
+ statsObj.getStatsData().getBinaryStats().setAvgColLen(d);
+ } else if (fName.equals("maxlength")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getBinaryStats().setMaxColLen(v);
+ }
+ }
+
+ private static void unpackDateStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+ if (fName.equals("countnulls")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getDateStats().setNumNulls(v);
+ } else if (fName.equals("numdistinctvalues")) {
+ long v = ((LongObjectInspector) oi).get(o);
+ statsObj.getStatsData().getDateStats().setNumDVs(v);
+ } else if (fName.equals("max")) {
+ DateWritable v = ((DateObjectInspector) oi).getPrimitiveWritableObject(o);
+ statsObj.getStatsData().getDateStats().setHighValue(new Date(v.getDays()));
+ } else if (fName.equals("min")) {
+ DateWritable v = ((DateObjectInspector) oi).getPrimitiveWritableObject(o);
+ statsObj.getStatsData().getDateStats().setLowValue(new Date(v.getDays()));
+ } else if (fName.equals("ndvbitvector")) {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+ byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
+ statsObj.getStatsData().getDateStats().setBitVectors(buf);
+ ;
+ }
+ }
+
+ private static void unpackPrimitiveObject(ObjectInspector oi, Object o, String fieldName, ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
+ if (o == null) {
+ return;
+ }
+ // First infer the type of object
+ if (fieldName.equals("columntype")) {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+ String s = ((StringObjectInspector) poi).getPrimitiveJavaObject(o);
+ ColumnStatisticsData statsData = new ColumnStatisticsData();
+
+ if (s.equalsIgnoreCase("long")) {
+ LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector();
+ statsData.setLongStats(longStats);
+ statsObj.setStatsData(statsData);
+ } else if (s.equalsIgnoreCase("double")) {
+ DoubleColumnStatsDataInspector doubleStats = new DoubleColumnStatsDataInspector();
+ statsData.setDoubleStats(doubleStats);
+ statsObj.setStatsData(statsData);
+ } else if (s.equalsIgnoreCase("string")) {
+ StringColumnStatsDataInspector stringStats = new StringColumnStatsDataInspector();
+ statsData.setStringStats(stringStats);
+ statsObj.setStatsData(statsData);
+ } else if (s.equalsIgnoreCase("boolean")) {
+ BooleanColumnStatsData booleanStats = new BooleanColumnStatsData();
+ statsData.setBooleanStats(booleanStats);
+ statsObj.setStatsData(statsData);
+ } else if (s.equalsIgnoreCase("binary")) {
+ BinaryColumnStatsData binaryStats = new BinaryColumnStatsData();
+ statsData.setBinaryStats(binaryStats);
+ statsObj.setStatsData(statsData);
+ } else if (s.equalsIgnoreCase("decimal")) {
+ DecimalColumnStatsDataInspector decimalStats = new DecimalColumnStatsDataInspector();
+ statsData.setDecimalStats(decimalStats);
+ statsObj.setStatsData(statsData);
+ } else if (s.equalsIgnoreCase("date")) {
+ DateColumnStatsDataInspector dateStats = new DateColumnStatsDataInspector();
+ statsData.setDateStats(dateStats);
+ statsObj.setStatsData(statsData);
+ }
+ } else {
+ // invoke the right unpack method depending on data type of the column
+ if (statsObj.getStatsData().isSetBooleanStats()) {
+ unpackBooleanStats(oi, o, fieldName, statsObj);
+ } else if (statsObj.getStatsData().isSetLongStats()) {
+ unpackLongStats(oi, o, fieldName, statsObj);
+ } else if (statsObj.getStatsData().isSetDoubleStats()) {
+ unpackDoubleStats(oi, o, fieldName, statsObj);
+ } else if (statsObj.getStatsData().isSetStringStats()) {
+ unpackStringStats(oi, o, fieldName, statsObj);
+ } else if (statsObj.getStatsData().isSetBinaryStats()) {
+ unpackBinaryStats(oi, o, fieldName, statsObj);
+ } else if (statsObj.getStatsData().isSetDecimalStats()) {
+ unpackDecimalStats(oi, o, fieldName, statsObj);
+ } else if (statsObj.getStatsData().isSetDateStats()) {
+ unpackDateStats(oi, o, fieldName, statsObj);
+ }
+ }
+ }
+
+ private static void unpackStructObject(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj cStatsObj) throws UnsupportedDoubleException {
+ if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new RuntimeException("Invalid object datatype : " + oi.getCategory().toString());
+ }
+
+ StructObjectInspector soi = (StructObjectInspector) oi;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ List<Object> list = soi.getStructFieldsDataAsList(o);
+
+ for (int i = 0; i < fields.size(); i++) {
+ // Get the field objectInspector, fieldName and the field object.
+ ObjectInspector foi = fields.get(i).getFieldObjectInspector();
+ Object f = (list == null ? null : list.get(i));
+ String fieldName = fields.get(i).getFieldName();
+
+ if (foi.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+ unpackPrimitiveObject(foi, f, fieldName, cStatsObj);
+ } else {
+ unpackStructObject(foi, f, fieldName, cStatsObj);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/IStatsProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/IStatsProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/IStatsProcessor.java
new file mode 100644
index 0000000..04219b5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/IStatsProcessor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.util.Collection;
+
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+public interface IStatsProcessor {
+
+ void initialize(CompilationOpContext opContext);
+
+ int process(Hive db, Table tbl) throws Exception;
+
+ void setDpPartSpecs(Collection<Partition> dpPartSpecs);
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java
new file mode 100644
index 0000000..e8d3184
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+
+/**
+ * Cover class to make it easier to make modifications on partitions/tables
+ */
+public abstract class Partish {
+
+ public static Partish buildFor(Table table) {
+ return new PTable(table);
+ }
+
+ public static Partish buildFor(Partition part) {
+ return new PPart(part.getTable(), part);
+ }
+
+ public static Partish buildFor(Table table, Partition part) {
+ return new PPart(table, part);
+ }
+
+ // rename
+ @Deprecated
+ public final boolean isAcid() {
+ return AcidUtils.isFullAcidTable(getTable());
+ }
+
+ public abstract Table getTable();
+
+ public abstract Map<String, String> getPartParameters();
+
+ public abstract StorageDescriptor getPartSd();
+
+ public abstract Object getOutput() throws HiveException;
+
+ public abstract Partition getPartition();
+
+ public abstract Class<? extends InputFormat> getInputFormatClass() throws HiveException;
+
+ public abstract Class<? extends OutputFormat> getOutputFormatClass() throws HiveException;
+
+ public abstract String getLocation();
+
+ public abstract String getSimpleName();
+
+ public final String getPartishType() {
+ return getClass().getSimpleName();
+ }
+
+ static class PTable extends Partish {
+ private Table table;
+
+ public PTable(Table table) {
+ this.table = table;
+ }
+
+ @Override
+ public Table getTable() {
+ return table;
+ }
+
+ @Override
+ public Map<String, String> getPartParameters() {
+ return table.getTTable().getParameters();
+ }
+
+ @Override
+ public StorageDescriptor getPartSd() {
+ return table.getTTable().getSd();
+ }
+
+ @Override
+ public Object getOutput() throws HiveException {
+ return new Table(getTable().getTTable());
+ }
+
+ @Override
+ public Partition getPartition() {
+ return null;
+ }
+
+ @Override
+ public Class<? extends InputFormat> getInputFormatClass() {
+ return table.getInputFormatClass();
+ }
+
+ @Override
+ public Class<? extends OutputFormat> getOutputFormatClass() {
+ return table.getOutputFormatClass();
+ }
+
+ @Override
+ public String getLocation() {
+ return table.getDataLocation().toString();
+ }
+
+ @Override
+ public String getSimpleName() {
+ return String.format("Table %s.%s", table.getDbName(), table.getTableName());
+ }
+ }
+
+ static class PPart extends Partish {
+ private Table table;
+ private Partition partition;
+
+ // FIXME: possibly the distinction between table/partition is not need; however it was like this before....will change it later
+ public PPart(Table table, Partition partiton) {
+ this.table = table;
+ partition = partiton;
+ }
+
+ @Override
+ public Table getTable() {
+ return table;
+ }
+
+ @Override
+ public Map<String, String> getPartParameters() {
+ return partition.getTPartition().getParameters();
+ }
+
+ @Override
+ public StorageDescriptor getPartSd() {
+ return partition.getTPartition().getSd();
+ }
+
+ @Override
+ public Object getOutput() throws HiveException {
+ return new Partition(table, partition.getTPartition());
+ }
+
+ @Override
+ public Partition getPartition() {
+ return partition;
+ }
+
+ @Override
+ public Class<? extends InputFormat> getInputFormatClass() throws HiveException {
+ return partition.getInputFormatClass();
+ }
+
+ @Override
+ public Class<? extends OutputFormat> getOutputFormatClass() throws HiveException {
+ return partition.getOutputFormatClass();
+ }
+
+ @Override
+ public String getLocation() {
+ return partition.getLocation();
+ }
+
+ @Override
+ public String getSimpleName() {
+ return String.format("Partition %s.%s %s", table.getDbName(), table.getTableName(), partition.getSpec());
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 2c76f79..149a9ad 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -835,4 +835,4 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}};
Assert.assertEquals(stringifyValues(expected), r);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 2faf098..b877253 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -132,6 +132,7 @@ public class TestTxnCommands2 {
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
TxnDbUtil.setConfValues(hiveConf);
TxnDbUtil.prepDb(hiveConf);
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 3e4f6f6..8737369 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -80,6 +80,7 @@ public abstract class TxnCommandsBaseForTests {
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
TxnDbUtil.setConfValues(hiveConf);
TxnDbUtil.prepDb(hiveConf);
File f = new File(getWarehouseDir());
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHiveReduceExpressionsWithStatsRule.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHiveReduceExpressionsWithStatsRule.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHiveReduceExpressionsWithStatsRule.java
index d0a9982..4c865e0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHiveReduceExpressionsWithStatsRule.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHiveReduceExpressionsWithStatsRule.java
@@ -92,7 +92,7 @@ public class TestHiveReduceExpressionsWithStatsRule {
builder = HiveRelFactories.HIVE_BUILDER.create(optCluster, schemaMock);
- StatsSetupConst.setStatsStateForCreateTable(tableParams, Lists.newArrayList("_int"), "TRUE");
+ StatsSetupConst.setStatsStateForCreateTable(tableParams, Lists.newArrayList("_int"), StatsSetupConst.TRUE);
tableParams.put(StatsSetupConst.ROW_COUNT, "3");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/autoColumnStats_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_1.q b/ql/src/test/queries/clientpositive/autoColumnStats_1.q
index 7955b07..cc32393 100644
--- a/ql/src/test/queries/clientpositive/autoColumnStats_1.q
+++ b/ql/src/test/queries/clientpositive/autoColumnStats_1.q
@@ -60,6 +60,8 @@ drop table nzhang_part14;
create table if not exists nzhang_part14 (key string)
partitioned by (value string);
+desc formatted nzhang_part14;
+
insert overwrite table nzhang_part14 partition(value)
select key, value from (
select * from (select 'k1' as key, cast(null as string) as value from src limit 2)a
@@ -69,6 +71,8 @@ select key, value from (
select * from (select 'k3' as key, ' ' as value from src limit 2)c
) T;
+desc formatted nzhang_part14 partition (value=' ');
+
explain select key from nzhang_part14;
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/autoColumnStats_10.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_10.q b/ql/src/test/queries/clientpositive/autoColumnStats_10.q
new file mode 100644
index 0000000..bf166d8
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/autoColumnStats_10.q
@@ -0,0 +1,52 @@
+set hive.mapred.mode=nonstrict;
+set hive.stats.column.autogather=true;
+
+drop table p;
+
+CREATE TABLE p(insert_num int, c1 tinyint, c2 smallint);
+
+desc formatted p;
+
+insert into p values (1,22,333);
+
+desc formatted p;
+
+alter table p replace columns (insert_num int, c1 STRING, c2 STRING);
+
+desc formatted p;
+
+desc formatted p insert_num;
+desc formatted p c1;
+
+insert into p values (2,11,111);
+
+desc formatted p;
+
+desc formatted p insert_num;
+desc formatted p c1;
+
+set hive.stats.column.autogather=false;
+
+drop table p;
+
+CREATE TABLE p(insert_num int, c1 tinyint, c2 smallint);
+
+desc formatted p;
+
+insert into p values (1,22,333);
+
+desc formatted p;
+
+alter table p replace columns (insert_num int, c1 STRING, c2 STRING);
+
+desc formatted p;
+
+desc formatted p insert_num;
+desc formatted p c1;
+
+insert into p values (2,11,111);
+
+desc formatted p;
+
+desc formatted p insert_num;
+desc formatted p c1;
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/autoColumnStats_5a.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_5a.q b/ql/src/test/queries/clientpositive/autoColumnStats_5a.q
new file mode 100644
index 0000000..a8bce18
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/autoColumnStats_5a.q
@@ -0,0 +1,30 @@
+set hive.stats.column.autogather=true;
+set hive.mapred.mode=nonstrict;
+set hive.cli.print.header=true;
+SET hive.exec.schema.evolution=true;
+SET hive.vectorized.execution.enabled=false;
+set hive.fetch.task.conversion=none;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+-- SORT_QUERY_RESULTS
+
+CREATE TABLE partitioned1(a INT, b STRING) PARTITIONED BY(part INT) STORED AS TEXTFILE;
+
+explain extended
+insert into table partitioned1 partition(part=1) values(1, 'original');
+
+insert into table partitioned1 partition(part=1) values(1, 'original');
+
+desc formatted partitioned1 partition(part=1);
+
+explain extended
+insert into table partitioned1 partition(part=1) values(2, 'original'), (3, 'original'),(4, 'original');
+
+insert into table partitioned1 partition(part=1) values(2, 'original'), (3, 'original'),(4, 'original');
+
+explain insert into table partitioned1 partition(part=1) values(1, 'original'),(2, 'original'), (3, 'original'),(4, 'original');
+
+desc formatted partitioned1;
+desc formatted partitioned1 partition(part=1);
+desc formatted partitioned1 partition(part=1) a;
+
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/basicstat_partval.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/basicstat_partval.q b/ql/src/test/queries/clientpositive/basicstat_partval.q
new file mode 100644
index 0000000..2db472d
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/basicstat_partval.q
@@ -0,0 +1,12 @@
+set hive.stats.autogather=true;
+
+CREATE TABLE p1(i int) partitioned by (p string);
+
+insert into p1 partition(p='a') values (1);
+insert into p1 partition(p='A') values (2),(3);
+
+describe formatted p1;
+describe formatted p1 partition(p='a');
+describe formatted p1 partition(p='A');
+
+
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/columnstats_partlvl.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/columnstats_partlvl.q b/ql/src/test/queries/clientpositive/columnstats_partlvl.q
index 2c92dfe..4283bca 100644
--- a/ql/src/test/queries/clientpositive/columnstats_partlvl.q
+++ b/ql/src/test/queries/clientpositive/columnstats_partlvl.q
@@ -14,6 +14,8 @@ explain extended
analyze table Employee_Part partition (employeeSalary=2000.0) compute statistics for columns employeeID;
analyze table Employee_Part partition (employeeSalary=2000.0) compute statistics for columns employeeID;
+describe formatted Employee_Part partition(employeeSalary=2000.0);
+
explain
analyze table Employee_Part partition (employeeSalary=4000.0) compute statistics for columns employeeID;
explain extended
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/columnstats_partlvl_dp.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/columnstats_partlvl_dp.q b/ql/src/test/queries/clientpositive/columnstats_partlvl_dp.q
index ead9a2d..c065edd 100644
--- a/ql/src/test/queries/clientpositive/columnstats_partlvl_dp.q
+++ b/ql/src/test/queries/clientpositive/columnstats_partlvl_dp.q
@@ -16,6 +16,8 @@ explain
analyze table Employee_Part partition (employeeSalary='4000.0', country) compute statistics for columns employeeName, employeeID;
analyze table Employee_Part partition (employeeSalary='4000.0', country) compute statistics for columns employeeName, employeeID;
+describe formatted Employee_Part partition (employeeSalary='4000.0', country='USA');
+
describe formatted Employee_Part partition (employeeSalary='4000.0', country='USA') employeeName;
-- don't specify all partitioning keys
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/deleteAnalyze.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/deleteAnalyze.q b/ql/src/test/queries/clientpositive/deleteAnalyze.q
index 26123a6..5293ddf 100644
--- a/ql/src/test/queries/clientpositive/deleteAnalyze.q
+++ b/ql/src/test/queries/clientpositive/deleteAnalyze.q
@@ -20,6 +20,8 @@ describe formatted testdeci2 amount;
analyze table testdeci2 compute statistics for columns;
+describe formatted testdeci2;
+
set hive.stats.fetch.column.stats=true;
analyze table testdeci2 compute statistics for columns;
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/exec_parallel_column_stats.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/exec_parallel_column_stats.q b/ql/src/test/queries/clientpositive/exec_parallel_column_stats.q
index ceacc24..a89b707 100644
--- a/ql/src/test/queries/clientpositive/exec_parallel_column_stats.q
+++ b/ql/src/test/queries/clientpositive/exec_parallel_column_stats.q
@@ -1,5 +1,7 @@
set hive.exec.parallel=true;
-explain analyze table src compute statistics for columns;
+create table t as select * from src;
-analyze table src compute statistics for columns;
\ No newline at end of file
+explain analyze table t compute statistics for columns;
+
+analyze table t compute statistics for columns;
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/outer_reference_windowed.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/outer_reference_windowed.q b/ql/src/test/queries/clientpositive/outer_reference_windowed.q
index cac6b41..3259ebe 100644
--- a/ql/src/test/queries/clientpositive/outer_reference_windowed.q
+++ b/ql/src/test/queries/clientpositive/outer_reference_windowed.q
@@ -34,6 +34,8 @@ ANALYZE TABLE e011_03 COMPUTE STATISTICS FOR COLUMNS;
set hive.explain.user=false;
+describe formatted e011_01;
+
explain select sum(sum(c1)) over() from e011_01;
select sum(sum(c1)) over() from e011_01;
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/smb_mapjoin_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/smb_mapjoin_1.q b/ql/src/test/queries/clientpositive/smb_mapjoin_1.q
index baf1690..b2394ad 100644
--- a/ql/src/test/queries/clientpositive/smb_mapjoin_1.q
+++ b/ql/src/test/queries/clientpositive/smb_mapjoin_1.q
@@ -12,6 +12,9 @@ load data local inpath '../../data/files/smbbucket_1.rc' overwrite into table sm
load data local inpath '../../data/files/smbbucket_2.rc' overwrite into table smb_bucket_2;
load data local inpath '../../data/files/smbbucket_3.rc' overwrite into table smb_bucket_3;
+desc formatted smb_bucket_1;
+select count(*) from smb_bucket_1;
+
set hive.cbo.enable=false;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;