You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/11/07 22:33:56 UTC

[19/22] hive git commit: HIVE-16827 : Merge stats task and column stats task into a single task (Zoltan Haindrich via Ashutosh Chauhan)

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
index 74629d5..b2bd465 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
@@ -19,136 +19,153 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
-
+import org.apache.hadoop.hive.ql.session.SessionState;
 
 /**
- * ConditionalStats.
+ * Stats Work, may include basic stats work and column stats desc
  *
  */
-@Explain(displayName = "Stats-Aggr Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+@Explain(displayName = "Stats Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
 public class StatsWork implements Serializable {
-  private static final long serialVersionUID = 1L;
-
-  private TableSpec tableSpecs;         // source table spec -- for TableScanOperator
-  private LoadTableDesc loadTableDesc;  // same as MoveWork.loadTableDesc -- for FileSinkOperator
-  private LoadFileDesc loadFileDesc;    // same as MoveWork.loadFileDesc -- for FileSinkOperator
-  private String aggKey;                // aggregation key prefix
-  private boolean statsReliable;        // are stats completely reliable
-
-  // If stats aggregator is not present, clear the current aggregator stats.
-  // For eg. if a merge is being performed, stats already collected by aggregator (numrows etc.)
-  // are still valid. However, if a load file is being performed, the old stats collected by
-  // aggregator are not valid. It might be a good idea to clear them instead of leaving wrong
-  // and old stats.
-  // Since HIVE-12661, we maintain the old stats (although may be wrong) for CBO
-  // purpose. We use a flag COLUMN_STATS_ACCURATE to
-  // show the accuracy of the stats.
 
-  private boolean clearAggregatorStats = false;
-
-  private boolean noStatsAggregator = false;
+  private static final long serialVersionUID = 1L;
+  // this is for basic stats
+  private BasicStatsWork basicStatsWork;
+  private BasicStatsNoJobWork basicStatsNoJobWork;
+  private ColumnStatsDesc colStats;
+  private static final int LIMIT = -1;
 
-  private boolean isNoScanAnalyzeCommand = false;
+  private String currentDatabase;
+  private boolean statsReliable;
+  private Table table;
+  private boolean truncate;
+  private boolean footerScan;
+  private Set<Partition> partitions = new HashSet<>();
 
-  // sourceTask for TS is not changed (currently) but that of FS might be changed
-  // by various optimizers (auto.convert.join, for example)
-  // so this is set by DriverContext in runtime
-  private transient Task sourceTask;
+  public StatsWork(Table table, BasicStatsWork basicStatsWork, HiveConf hconf) {
+    super();
+    this.table = table;
+    this.basicStatsWork = basicStatsWork;
+    this.currentDatabase = SessionState.get().getCurrentDatabase();
+    statsReliable = hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE);
+    basicStatsWork.setStatsReliable(statsReliable);
+  }
 
-  // used by FS based stats collector
-  private String statsTmpDir;
+  public StatsWork(Table table, HiveConf hconf) {
+    super();
+    this.table = table;
+    this.currentDatabase = SessionState.get().getCurrentDatabase();
+    statsReliable = hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE);
+  }
 
-  public StatsWork() {
+  @Override
+  public String toString() {
+    return String.format("StatWork; fetch: %s", getfWork());
   }
 
-  public StatsWork(TableSpec tableSpecs) {
-    this.tableSpecs = tableSpecs;
+  FetchWork getfWork() {
+    return colStats == null ? null : colStats.getFWork();
   }
 
-  public StatsWork(LoadTableDesc loadTableDesc) {
-    this.loadTableDesc = loadTableDesc;
+  @Explain(displayName = "Column Stats Desc")
+  public ColumnStatsDesc getColStats() {
+    return colStats;
   }
 
-  public StatsWork(LoadFileDesc loadFileDesc) {
-    this.loadFileDesc = loadFileDesc;
+  public void setColStats(ColumnStatsDesc colStats) {
+    this.colStats = colStats;
   }
 
-  public TableSpec getTableSpecs() {
-    return tableSpecs;
+  // unused / unknown reason
+  @Deprecated
+  public static int getLimit() {
+    return LIMIT;
   }
 
-  public LoadTableDesc getLoadTableDesc() {
-    return loadTableDesc;
+  @Explain(displayName = "Basic Stats Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+  public BasicStatsWork getBasicStatsWork() {
+    return basicStatsWork;
   }
 
-  public LoadFileDesc getLoadFileDesc() {
-    return loadFileDesc;
+  // only explain uses it
+  @Explain(displayName = "Basic Stats NoJob Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+  public BasicStatsNoJobWork getBasicStatsNoJobWork() {
+    return basicStatsNoJobWork;
   }
 
-  public void setAggKey(String aggK) {
-    aggKey = aggK;
+  public void setSourceTask(Task<?> sourceTask) {
+    basicStatsWork.setSourceTask(sourceTask);
   }
 
-  @Explain(displayName = "Stats Aggregation Key Prefix", explainLevels = { Level.EXTENDED })
-  public String getAggKey() {
-    return aggKey;
+  public String getCurrentDatabaseName() {
+    return currentDatabase;
   }
 
-  public String getStatsTmpDir() {
-    return statsTmpDir;
+  public boolean hasColStats() {
+    return colStats != null;
   }
 
-  public void setStatsTmpDir(String statsTmpDir) {
-    this.statsTmpDir = statsTmpDir;
+  public Table getTable() {
+    return table;
   }
 
-  public boolean getNoStatsAggregator() {
-    return noStatsAggregator;
+  public void collectStatsFromAggregator(IStatsGatherDesc conf) {
+    // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
+    // in FileSinkDesc is used for stats publishing. They should be consistent.
+    basicStatsWork.setAggKey(conf.getStatsAggPrefix());
+    basicStatsWork.setStatsTmpDir(conf.getTmpStatsDir());
+    basicStatsWork.setStatsReliable(statsReliable);
   }
 
-  public void setNoStatsAggregator(boolean noStatsAggregator) {
-    this.noStatsAggregator = noStatsAggregator;
+  public void truncateExisting(boolean truncate) {
+    this.truncate = truncate;
   }
 
-  public boolean isStatsReliable() {
-    return statsReliable;
+
+  public void setFooterScan() {
+    basicStatsNoJobWork = new BasicStatsNoJobWork(table.getTableSpec());
+    basicStatsNoJobWork.setStatsReliable(getStatsReliable());
+    footerScan = true;
   }
 
-  public void setStatsReliable(boolean statsReliable) {
-    this.statsReliable = statsReliable;
+  public void addInputPartitions(Set<Partition> partitions) {
+    this.partitions.addAll(partitions);
   }
 
-  public boolean isClearAggregatorStats() {
-    return clearAggregatorStats;
+  public Set<Partition> getPartitions() {
+    return partitions;
   }
 
-  public void setClearAggregatorStats(boolean clearAggregatorStats) {
-    this.clearAggregatorStats = clearAggregatorStats;
+  public boolean isFooterScan() {
+    return footerScan;
   }
 
-  /**
-   * @return the isNoScanAnalyzeCommand
-   */
-  public boolean isNoScanAnalyzeCommand() {
-    return isNoScanAnalyzeCommand;
+  public boolean getStatsReliable() {
+    return statsReliable;
   }
 
-  /**
-   * @param isNoScanAnalyzeCommand the isNoScanAnalyzeCommand to set
-   */
-  public void setNoScanAnalyzeCommand(boolean isNoScanAnalyzeCommand) {
-    this.isNoScanAnalyzeCommand = isNoScanAnalyzeCommand;
+  public String getFullTableName() {
+    return table.getDbName() + "." + table.getTableName();
   }
 
   public Task getSourceTask() {
-    return sourceTask;
+    return basicStatsWork == null ? null : basicStatsWork.getSourceTask();
+  }
+
+  public String getAggKey() {
+    return basicStatsWork.getAggKey();
   }
 
-  public void setSourceTask(Task sourceTask) {
-    this.sourceTask = sourceTask;
+  public boolean isAggregating() {
+    return basicStatsWork != null && basicStatsWork.getAggKey() != null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 75d0f43..237c8cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
  * things will be added here as table scan is invoked as part of local work.
  **/
 @Explain(displayName = "TableScan", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class TableScanDesc extends AbstractOperatorDesc {
+public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherDesc {
   private static final long serialVersionUID = 1L;
 
   private String alias;
@@ -263,11 +263,13 @@ public class TableScanDesc extends AbstractOperatorDesc {
     this.gatherStats = gatherStats;
   }
 
+  @Override
   @Explain(displayName = "GatherStats", explainLevels = { Level.EXTENDED })
   public boolean isGatherStats() {
     return gatherStats;
   }
 
+  @Override
   public String getTmpStatsDir() {
     return tmpStatsDir;
   }
@@ -296,6 +298,7 @@ public class TableScanDesc extends AbstractOperatorDesc {
     statsAggKeyPrefix = k;
   }
 
+  @Override
   @Explain(displayName = "Statistics Aggregation Key Prefix", explainLevels = { Level.EXTENDED })
   public String getStatsAggPrefix() {
     return statsAggKeyPrefix;

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
new file mode 100644
index 0000000..d1f7652
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+import org.apache.hadoop.hive.ql.plan.BasicStatsNoJobWork;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
+
+/**
+ * StatsNoJobTask is used in cases where stats collection is the only task for the given query (no
+ * parent MR or Tez job). It is used in the following cases 1) ANALYZE with noscan for
+ * file formats that implement StatsProvidingRecordReader interface: ORC format (implements
+ * StatsProvidingRecordReader) stores column statistics for all columns in the file footer. Its much
+ * faster to compute the table/partition statistics by reading the footer than scanning all the
+ * rows. This task can be used for computing basic stats like numFiles, numRows, fileSize,
+ * rawDataSize from ORC footer.
+ **/
+public class BasicStatsNoJobTask implements IStatsProcessor {
+
+  private static transient final Logger LOG = LoggerFactory.getLogger(BasicStatsNoJobTask.class);
+  private HiveConf conf;
+
+  private BasicStatsNoJobWork work;
+  private LogHelper console;
+
+  public BasicStatsNoJobTask(HiveConf conf, BasicStatsNoJobWork work) {
+    this.conf = conf;
+    this.work = work;
+    console = new LogHelper(LOG);
+  }
+
+
+  @Override
+  public void initialize(CompilationOpContext opContext) {
+
+  }
+
+  @Override
+  public int process(Hive db, Table tbl) throws Exception {
+
+    LOG.info("Executing stats (no job) task");
+
+    ExecutorService threadPool = StatsTask.newThreadPool(conf);
+
+    return aggregateStats(threadPool, db);
+  }
+
+  public StageType getType() {
+    return StageType.STATS;
+  }
+
+  public String getName() {
+    return "STATS-NO-JOB";
+  }
+
+  static class StatItem {
+    Partish partish;
+    Map<String, String> params;
+    Object result;
+  }
+
+  static class FooterStatCollector implements Runnable {
+
+    private Partish partish;
+    private Object result;
+    private JobConf jc;
+    private Path dir;
+    private FileSystem fs;
+    private LogHelper console;
+
+    public FooterStatCollector(JobConf jc, Partish partish) {
+      this.jc = jc;
+      this.partish = partish;
+    }
+
+    public static final Function<FooterStatCollector, String> SIMPLE_NAME_FUNCTION = new Function<FooterStatCollector, String>() {
+
+      @Override
+      public String apply(FooterStatCollector sc) {
+        return String.format("%s#%s", sc.partish.getTable().getCompleteName(), sc.partish.getPartishType());
+      }
+    };
+    private static final Function<FooterStatCollector, Partition> EXTRACT_RESULT_FUNCTION = new Function<FooterStatCollector, Partition>() {
+      @Override
+      public Partition apply(FooterStatCollector input) {
+        return (Partition) input.result;
+      }
+    };
+
+    private boolean isValid() {
+      return result != null;
+    }
+
+    public void init(HiveConf conf, LogHelper console) throws IOException {
+      this.console = console;
+      dir = new Path(partish.getPartSd().getLocation());
+      fs = dir.getFileSystem(conf);
+    }
+
+    @Override
+    public void run() {
+
+      Map<String, String> parameters = partish.getPartParameters();
+      try {
+        long numRows = 0;
+        long rawDataSize = 0;
+        long fileSize = 0;
+        long numFiles = 0;
+        LOG.debug("Aggregating stats for {}", dir);
+        FileStatus[] fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs);
+
+        for (FileStatus file : fileList) {
+          LOG.debug("Computing stats for {}", file);
+          if (!file.isDirectory()) {
+            InputFormat<?, ?> inputFormat = ReflectionUtil.newInstance(partish.getInputFormatClass(), jc);
+            InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0, new String[] { partish.getLocation() });
+            if (file.getLen() == 0) {
+              numFiles += 1;
+            } else {
+              org.apache.hadoop.mapred.RecordReader<?, ?> recordReader = inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
+              try {
+                if (recordReader instanceof StatsProvidingRecordReader) {
+                  StatsProvidingRecordReader statsRR;
+                  statsRR = (StatsProvidingRecordReader) recordReader;
+                  rawDataSize += statsRR.getStats().getRawDataSize();
+                  numRows += statsRR.getStats().getRowCount();
+                  fileSize += file.getLen();
+                  numFiles += 1;
+                } else {
+                  throw new HiveException(String.format("Unexpected file found during reading footers for: %s ", file));
+                }
+              } finally {
+                recordReader.close();
+              }
+            }
+          }
+        }
+
+        StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
+
+        parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
+        parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(rawDataSize));
+        parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(fileSize));
+        parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(numFiles));
+
+        if (partish.getPartition() != null) {
+          result = new Partition(partish.getTable(), partish.getPartition().getTPartition());
+        } else {
+          result = new Table(partish.getTable().getTTable());
+        }
+
+        String msg = partish.getSimpleName() + " stats: [" + toString(parameters) + ']';
+        LOG.debug(msg);
+        console.printInfo(msg);
+
+      } catch (Exception e) {
+        console.printInfo("[Warning] could not update stats for " + partish.getSimpleName() + ".", "Failed with exception " + e.getMessage() + "\n" + StringUtils.stringifyException(e));
+      }
+    }
+
+    private String toString(Map<String, String> parameters) {
+      StringBuilder builder = new StringBuilder();
+      for (String statType : StatsSetupConst.supportedStats) {
+        String value = parameters.get(statType);
+        if (value != null) {
+          if (builder.length() > 0) {
+            builder.append(", ");
+          }
+          builder.append(statType).append('=').append(value);
+        }
+      }
+      return builder.toString();
+    }
+
+  }
+
+  private int aggregateStats(ExecutorService threadPool, Hive db) {
+    int ret = 0;
+    try {
+      JobConf jc = new JobConf(conf);
+
+      TableSpec tableSpecs = work.getTableSpecs();
+
+      if (tableSpecs == null) {
+        throw new RuntimeException("this is unexpected...needs some investigation");
+      }
+
+      Table table = tableSpecs.tableHandle;
+
+      Collection<Partition> partitions = null;
+      if (work.getPartitions() == null || work.getPartitions().isEmpty()) {
+        if (table.isPartitioned()) {
+          partitions = tableSpecs.partitions;
+        }
+      } else {
+        partitions = work.getPartitions();
+      }
+
+      LinkedList<Partish> partishes = Lists.newLinkedList();
+      if (partitions == null) {
+        partishes.add(Partish.buildFor(table));
+      } else {
+        for (Partition part : partitions) {
+          partishes.add(Partish.buildFor(table, part));
+        }
+      }
+
+      List<FooterStatCollector> scs = Lists.newArrayList();
+      for (Partish partish : partishes) {
+        scs.add(new FooterStatCollector(jc, partish));
+      }
+
+      for (FooterStatCollector sc : scs) {
+        sc.init(conf, console);
+        threadPool.execute(sc);
+      }
+
+      LOG.debug("Stats collection waiting for threadpool to shutdown..");
+      shutdownAndAwaitTermination(threadPool);
+      LOG.debug("Stats collection threadpool shutdown successful.");
+
+      ret = updatePartitions(db, scs, table);
+
+    } catch (Exception e) {
+      console.printError("Failed to collect footer statistics.", "Failed with exception " + e.getMessage() + "\n" + StringUtils.stringifyException(e));
+      // Fail the query if the stats are supposed to be reliable
+      if (work.isStatsReliable()) {
+        ret = -1;
+      }
+    }
+
+    // The return value of 0 indicates success,
+    // anything else indicates failure
+    return ret;
+  }
+
+  private int updatePartitions(Hive db, List<FooterStatCollector> scs, Table table) throws InvalidOperationException, HiveException {
+
+    String tableFullName = table.getFullyQualifiedName();
+
+    if (scs.isEmpty()) {
+      return 0;
+    }
+    if (work.isStatsReliable()) {
+      for (FooterStatCollector statsCollection : scs) {
+        if (statsCollection.result == null) {
+          LOG.debug("Stats requested to be reliable. Empty stats found: {}", statsCollection.partish.getSimpleName());
+          return -1;
+        }
+      }
+    }
+    List<FooterStatCollector> validColectors = Lists.newArrayList();
+    for (FooterStatCollector statsCollection : scs) {
+      if (statsCollection.isValid()) {
+        validColectors.add(statsCollection);
+      }
+    }
+
+    EnvironmentContext environmentContext = new EnvironmentContext();
+    environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
+
+    ImmutableListMultimap<String, FooterStatCollector> collectorsByTable = Multimaps.index(validColectors, FooterStatCollector.SIMPLE_NAME_FUNCTION);
+
+    LOG.debug("Collectors.size(): {}", collectorsByTable.keySet());
+
+    if (collectorsByTable.keySet().size() < 1) {
+      LOG.warn("Collectors are empty! ; {}", tableFullName);
+    }
+
+    // for now this should be true...
+    assert (collectorsByTable.keySet().size() <= 1);
+
+    LOG.debug("Updating stats for: {}", tableFullName);
+
+    for (String partName : collectorsByTable.keySet()) {
+      ImmutableList<FooterStatCollector> values = collectorsByTable.get(partName);
+
+      if (values == null) {
+        throw new RuntimeException("very intresting");
+      }
+
+      if (values.get(0).result instanceof Table) {
+        db.alterTable(tableFullName, (Table) values.get(0).result, environmentContext);
+        LOG.debug("Updated stats for {}.", tableFullName);
+      } else {
+        if (values.get(0).result instanceof Partition) {
+          List<Partition> results = Lists.transform(values, FooterStatCollector.EXTRACT_RESULT_FUNCTION);
+          db.alterPartitions(tableFullName, results, environmentContext);
+          LOG.debug("Bulk updated {} partitions of {}.", results.size(), tableFullName);
+        } else {
+          throw new RuntimeException("inconsistent");
+        }
+      }
+    }
+    LOG.debug("Updated stats for: {}", tableFullName);
+    return 0;
+  }
+
+  private void shutdownAndAwaitTermination(ExecutorService threadPool) {
+
+    // Disable new tasks from being submitted
+    threadPool.shutdown();
+    try {
+
+      // Wait a while for existing tasks to terminate
+      // XXX this will wait forever... :)
+      while (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
+        LOG.debug("Waiting for all stats tasks to finish...");
+      }
+      // Cancel currently executing tasks
+      threadPool.shutdownNow();
+
+      // Wait a while for tasks to respond to being cancelled
+      if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) {
+        LOG.debug("Stats collection thread pool did not terminate");
+      }
+    } catch (InterruptedException ie) {
+
+      // Cancel again if current thread also interrupted
+      threadPool.shutdownNow();
+
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void setDpPartSpecs(Collection<Partition> dpPartSpecs) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java
new file mode 100644
index 0000000..ecf3b9d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsTask.java
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+import org.apache.hadoop.hive.ql.plan.BasicStatsWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * StatsTask implementation. StatsTask mainly deals with "collectable" stats. These are
+ * stats that require data scanning and are collected during query execution (unless the user
+ * explicitly requests data scanning just for the purpose of stats computation using the "ANALYZE"
+ * command. All other stats are computed directly by the MetaStore. The rationale being that the
+ * MetaStore layer covers all Thrift calls and provides better guarantees about the accuracy of
+ * those stats.
+ **/
+public class BasicStatsTask implements Serializable, IStatsProcessor {
+
+  private static final long serialVersionUID = 1L;
+  private static transient final Logger LOG = LoggerFactory.getLogger(BasicStatsTask.class);
+
+  private Table table;
+  private Collection<Partition> dpPartSpecs;
+  public boolean followedColStats;
+  private BasicStatsWork work;
+  private HiveConf conf;
+
+  protected transient LogHelper console;
+
+  public BasicStatsTask(HiveConf conf, BasicStatsWork work) {
+    super();
+    dpPartSpecs = null;
+    this.conf = conf;
+    console = new LogHelper(LOG);
+    this.work = work;
+  }
+
+  @Override
+  public int process(Hive db, Table tbl) throws Exception {
+
+    LOG.info("Executing stats task");
+    table = tbl;
+    return aggregateStats(db);
+  }
+
+  @Override
+  public void initialize(CompilationOpContext opContext) {
+  }
+
+  public StageType getType() {
+    return StageType.STATS;
+  }
+
+  public String getName() {
+    return "STATS";
+  }
+
+  private static class BasicStatsProcessor {
+
+    private Partish partish;
+    private FileStatus[] partfileStatus;
+    private BasicStatsWork work;
+    private boolean atomic;
+    private boolean followedColStats1;
+
+    public BasicStatsProcessor(Partish partish, BasicStatsWork work, HiveConf conf, boolean followedColStats2) {
+      this.partish = partish;
+      this.work = work;
+      atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
+      followedColStats1 = followedColStats2;
+    }
+
+    public Object process(StatsAggregator statsAggregator) throws HiveException, MetaException {
+      Partish p = partish;
+      Map<String, String> parameters = p.getPartParameters();
+      if (p.isAcid()) {
+        StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
+      }
+
+      if (work.isTargetRewritten()) {
+        StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
+      }
+
+      // work.getTableSpecs() == null means it is not analyze command
+      // and then if it is not followed by column stats, we should clean
+      // column stats
+      // FIXME: move this to ColStat related part
+      if (!work.isExplicitAnalyze() && !followedColStats1) {
+        StatsSetupConst.clearColumnStatsState(parameters);
+      }
+      // non-partitioned tables:
+      // XXX: I don't aggree with this logic
+      // FIXME: deprecate atomic? what's its purpose?
+      if (!existStats(parameters) && atomic) {
+        return null;
+      }
+      if(partfileStatus == null){
+        LOG.warn("Partition/partfiles is null for: " + partish.getPartition().getSpec());
+        return null;
+      }
+
+      // The collectable stats for the aggregator needs to be cleared.
+      // For eg. if a file is being loaded, the old number of rows are not valid
+      // XXX: makes no sense for me... possibly not needed anymore
+      if (work.isClearAggregatorStats()) {
+        // we choose to keep the invalid stats and only change the setting.
+        StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
+      }
+
+      updateQuickStats(parameters, partfileStatus);
+      if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
+        if (statsAggregator != null) {
+          String prefix = getAggregationPrefix(p.getTable(), p.getPartition());
+          updateStats(statsAggregator, parameters, prefix, atomic);
+        }
+      }
+
+      return p.getOutput();
+    }
+
+    public void collectFileStatus(Warehouse wh) throws MetaException {
+      Map<String, String> parameters = partish.getPartParameters();
+      if (!existStats(parameters) && atomic) {
+        return;
+      }
+      partfileStatus = wh.getFileStatusesForSD(partish.getPartSd());
+    }
+
+    @Deprecated
+    private boolean existStats(Map<String, String> parameters) {
+      return parameters.containsKey(StatsSetupConst.ROW_COUNT)
+          || parameters.containsKey(StatsSetupConst.NUM_FILES)
+          || parameters.containsKey(StatsSetupConst.TOTAL_SIZE)
+          || parameters.containsKey(StatsSetupConst.RAW_DATA_SIZE)
+          || parameters.containsKey(StatsSetupConst.NUM_PARTITIONS);
+    }
+
+    private void updateQuickStats(Map<String, String> parameters, FileStatus[] partfileStatus) throws MetaException {
+      MetaStoreUtils.populateQuickStats(partfileStatus, parameters);
+    }
+
+    private String getAggregationPrefix(Table table, Partition partition) throws MetaException {
+      String prefix = getAggregationPrefix0(table, partition);
+      String aggKey = prefix.endsWith(Path.SEPARATOR) ? prefix : prefix + Path.SEPARATOR;
+      return aggKey;
+    }
+
+    private String getAggregationPrefix0(Table table, Partition partition) throws MetaException {
+
+      // prefix is of the form dbName.tblName
+      String prefix = table.getDbName() + "." + org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.encodeTableName(table.getTableName());
+      // FIXME: this is a secret contract; reusein getAggrKey() creates a more closer relation to the StatsGatherer
+      // prefix = work.getAggKey();
+      prefix = prefix.toLowerCase();
+      if (partition != null) {
+        return Utilities.join(prefix, Warehouse.makePartPath(partition.getSpec()));
+      }
+      return prefix;
+    }
+
+    private void updateStats(StatsAggregator statsAggregator, Map<String, String> parameters, String aggKey, boolean atomic) throws HiveException {
+
+      for (String statType : StatsSetupConst.statsRequireCompute) {
+        String value = statsAggregator.aggregateStats(aggKey, statType);
+        if (value != null && !value.isEmpty()) {
+          long longValue = Long.parseLong(value);
+
+          if (!work.isTargetRewritten()) {
+            String originalValue = parameters.get(statType);
+            if (originalValue != null) {
+              longValue += Long.parseLong(originalValue); // todo: invalid + valid = invalid
+            }
+          }
+          parameters.put(statType, String.valueOf(longValue));
+        } else {
+          if (atomic) {
+            throw new HiveException(ErrorMsg.STATSAGGREGATOR_MISSED_SOMESTATS, statType);
+          }
+        }
+      }
+    }
+
+  }
+
+
+  private int aggregateStats(Hive db) {
+
+    StatsAggregator statsAggregator = null;
+    int ret = 0;
+    StatsCollectionContext scc = null;
+    EnvironmentContext environmentContext = null;
+    environmentContext = new EnvironmentContext();
+    environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
+
+    try {
+      // Stats setup:
+      final Warehouse wh = new Warehouse(conf);
+      if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {
+        try {
+          scc = getContext();
+          statsAggregator = createStatsAggregator(scc, conf);
+        } catch (HiveException e) {
+          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
+            throw e;
+          }
+          console.printError(ErrorMsg.STATS_SKIPPING_BY_ERROR.getErrorCodedMsg(e.toString()));
+        }
+      }
+
+      List<Partition> partitions = getPartitionsList(db);
+
+      String tableFullName = table.getDbName() + "." + table.getTableName();
+
+      List<Partish> partishes = new ArrayList<>();
+
+      if (partitions == null) {
+        Partish p;
+        partishes.add(p = new Partish.PTable(table));
+
+        BasicStatsProcessor basicStatsProcessor = new BasicStatsProcessor(p, work, conf, followedColStats);
+        basicStatsProcessor.collectFileStatus(wh);
+        Object res = basicStatsProcessor.process(statsAggregator);
+
+        if (res == null) {
+          return 0;
+        }
+        db.alterTable(tableFullName, (Table) res, environmentContext);
+
+        if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
+          console.printInfo("Table " + tableFullName + " stats: [" + toString(p.getPartParameters()) + ']');
+        }
+        LOG.info("Table " + tableFullName + " stats: [" + toString(p.getPartParameters()) + ']');
+
+      } else {
+        // Partitioned table:
+        // Need to get the old stats of the partition
+        // and update the table stats based on the old and new stats.
+
+        List<Partition> updates = new ArrayList<Partition>();
+
+        final ExecutorService pool = buildBasicStatsExecutor();
+
+        final List<Future<Void>> futures = Lists.newLinkedList();
+        List<BasicStatsProcessor> processors = Lists.newLinkedList();
+
+        try {
+          for(final Partition partn : partitions) {
+            Partish p;
+            BasicStatsProcessor bsp = new BasicStatsProcessor(p = new Partish.PPart(table, partn), work, conf, followedColStats);
+            processors.add(bsp);
+
+            futures.add(pool.submit(new Callable<Void>() {
+              @Override
+              public Void call() throws Exception {
+                bsp.collectFileStatus(wh);
+                return null;
+              }
+            }));
+          }
+          pool.shutdown();
+          for (Future<Void> future : futures) {
+            future.get();
+          }
+        } catch (InterruptedException e) {
+          LOG.debug("Cancelling " + futures.size() + " file stats lookup tasks");
+          //cancel other futures
+          for (Future future : futures) {
+            future.cancel(true);
+          }
+          // Fail the query if the stats are supposed to be reliable
+          if (work.isStatsReliable()) {
+            ret = 1;
+          }
+        } finally {
+          if (pool != null) {
+            pool.shutdownNow();
+          }
+          LOG.debug("Finished getting file stats of all partitions!");
+        }
+
+        for (BasicStatsProcessor basicStatsProcessor : processors) {
+          Object res = basicStatsProcessor.process(statsAggregator);
+          if (res == null) {
+            LOG.info("Partition " + basicStatsProcessor.partish.getPartition().getSpec() + " stats: [0]");
+            continue;
+          }
+          updates.add((Partition) res);
+          if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
+            console.printInfo("Partition " + basicStatsProcessor.partish.getPartition().getSpec() + " stats: [" + toString(basicStatsProcessor.partish.getPartParameters()) + ']');
+          }
+          LOG.info("Partition " + basicStatsProcessor.partish.getPartition().getSpec() + " stats: [" + toString(basicStatsProcessor.partish.getPartParameters()) + ']');
+        }
+
+        if (!updates.isEmpty()) {
+          db.alterPartitions(tableFullName, updates, environmentContext);
+        }
+        if (work.isStatsReliable() && updates.size() != processors.size()) {
+          LOG.info("Stats should be reliadble...however seems like there were some issue.. => ret 1");
+          ret = 1;
+        }
+      }
+
+    } catch (Exception e) {
+      console.printInfo("[Warning] could not update stats.",
+          "Failed with exception " + e.getMessage() + "\n"
+              + StringUtils.stringifyException(e));
+
+      // Fail the query if the stats are supposed to be reliable
+      if (work.isStatsReliable()) {
+        ret = 1;
+      }
+    } finally {
+      if (statsAggregator != null) {
+        statsAggregator.closeConnection(scc);
+      }
+    }
+    // The return value of 0 indicates success,
+    // anything else indicates failure
+    return ret;
+  }
+
+  private BasicStatsWork getWork() {
+    return work;
+  }
+
+  private ExecutorService buildBasicStatsExecutor() {
+    //Get the file status up-front for all partitions. Beneficial in cases of blob storage systems
+    int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 1);
+    // In case thread count is set to 0, use single thread.
+    poolSize = Math.max(poolSize, 1);
+    final ExecutorService pool = Executors.newFixedThreadPool(poolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("stats-updater-thread-%d").build());
+    LOG.debug("Getting file stats of all partitions. threadpool size:" + poolSize);
+    return pool;
+  }
+
+  private StatsAggregator createStatsAggregator(StatsCollectionContext scc, HiveConf conf) throws HiveException {
+    String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+    StatsFactory factory = StatsFactory.newFactory(statsImpl, conf);
+    if (factory == null) {
+      throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg());
+    }
+    // initialize stats publishing table for noscan which has only stats task
+    // the rest of MR task following stats task initializes it in ExecDriver.java
+    StatsPublisher statsPublisher = factory.getStatsPublisher();
+    if (!statsPublisher.init(scc)) { // creating stats table if not exists
+      throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
+    }
+
+    // manufacture a StatsAggregator
+    StatsAggregator statsAggregator = factory.getStatsAggregator();
+    if (!statsAggregator.connect(scc)) {
+      throw new HiveException(ErrorMsg.STATSAGGREGATOR_CONNECTION_ERROR.getErrorCodedMsg(statsImpl));
+    }
+    return statsAggregator;
+  }
+
+  private StatsCollectionContext getContext() throws HiveException {
+
+    StatsCollectionContext scc = new StatsCollectionContext(conf);
+    Task sourceTask = getWork().getSourceTask();
+    if (sourceTask == null) {
+      throw new HiveException(ErrorMsg.STATSAGGREGATOR_SOURCETASK_NULL.getErrorCodedMsg());
+    }
+    scc.setTask(sourceTask);
+    scc.setStatsTmpDir(this.getWork().getStatsTmpDir());
+    return scc;
+  }
+
+
+  private String toString(Map<String, String> parameters) {
+    StringBuilder builder = new StringBuilder();
+    for (String statType : StatsSetupConst.supportedStats) {
+      String value = parameters.get(statType);
+      if (value != null) {
+        if (builder.length() > 0) {
+          builder.append(", ");
+        }
+        builder.append(statType).append('=').append(value);
+      }
+    }
+    return builder.toString();
+  }
+
+  /**
+   * Get the list of partitions that need to update statistics.
+   * TODO: we should reuse the Partitions generated at compile time
+   * since getting the list of partitions is quite expensive.
+   *
+   * @return a list of partitions that need to update statistics.
+   * @throws HiveException
+   */
+  private List<Partition> getPartitionsList(Hive db) throws HiveException {
+    if (work.getLoadFileDesc() != null) {
+      return null; //we are in CTAS, so we know there are no partitions
+    }
+
+    List<Partition> list = new ArrayList<Partition>();
+
+    if (work.getTableSpecs() != null) {
+
+      // ANALYZE command
+      TableSpec tblSpec = work.getTableSpecs();
+      table = tblSpec.tableHandle;
+      if (!table.isPartitioned()) {
+        return null;
+      }
+      // get all partitions that matches with the partition spec
+      List<Partition> partitions = tblSpec.partitions;
+      if (partitions != null) {
+        for (Partition partn : partitions) {
+          list.add(partn);
+        }
+      }
+    } else if (work.getLoadTableDesc() != null) {
+
+      // INSERT OVERWRITE command
+      LoadTableDesc tbd = work.getLoadTableDesc();
+      table = db.getTable(tbd.getTable().getTableName());
+      if (!table.isPartitioned()) {
+        return null;
+      }
+      DynamicPartitionCtx dpCtx = tbd.getDPCtx();
+      if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
+        // If no dynamic partitions are generated, dpPartSpecs may not be initialized
+        if (dpPartSpecs != null) {
+          // load the list of DP partitions and return the list of partition specs
+          list.addAll(dpPartSpecs);
+        }
+      } else { // static partition
+        Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
+        list.add(partn);
+      }
+    }
+    return list;
+  }
+
+  public Collection<Partition> getDpPartSpecs() {
+    return dpPartSpecs;
+  }
+
+  @Override
+  public void setDpPartSpecs(Collection<Partition> dpPartSpecs) {
+    this.dpPartSpecs = dpPartSpecs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
new file mode 100644
index 0000000..7ce7a74
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.FetchOperator;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.stats.ColumnStatisticsObjTranslator;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ColStatsProcessor implements IStatsProcessor {
+  private static transient final Logger LOG = LoggerFactory.getLogger(ColStatsProcessor.class);
+
+  private FetchOperator ftOp;
+  private FetchWork fWork;
+  private ColumnStatsDesc colStatDesc;
+  private HiveConf conf;
+  private boolean isStatsReliable;
+
+  public ColStatsProcessor(ColumnStatsDesc colStats, HiveConf conf) {
+    this.conf = conf;
+    fWork = colStats.getFWork();
+    colStatDesc = colStats;
+    isStatsReliable = conf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE);
+  }
+
+  @Override
+  public void initialize(CompilationOpContext opContext) {
+    try {
+      fWork.initializeForFetch(opContext);
+      JobConf job = new JobConf(conf);
+      ftOp = new FetchOperator(fWork, job);
+    } catch (Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public int process(Hive db, Table tbl) throws Exception {
+    return persistColumnStats(db, tbl);
+  }
+
+  private List<ColumnStatistics> constructColumnStatsFromPackedRows(Table tbl1) throws HiveException, MetaException, IOException {
+
+    Table tbl = tbl1;
+
+    String partName = null;
+    List<String> colName = colStatDesc.getColName();
+    List<String> colType = colStatDesc.getColType();
+    boolean isTblLevel = colStatDesc.isTblLevel();
+
+    List<ColumnStatistics> stats = new ArrayList<ColumnStatistics>();
+    InspectableObject packedRow;
+    while ((packedRow = ftOp.getNextRow()) != null) {
+      if (packedRow.oi.getCategory() != ObjectInspector.Category.STRUCT) {
+        throw new HiveException("Unexpected object type encountered while unpacking row");
+      }
+
+      List<ColumnStatisticsObj> statsObjs = new ArrayList<ColumnStatisticsObj>();
+      StructObjectInspector soi = (StructObjectInspector) packedRow.oi;
+      List<? extends StructField> fields = soi.getAllStructFieldRefs();
+      List<Object> list = soi.getStructFieldsDataAsList(packedRow.o);
+
+      List<FieldSchema> partColSchema = tbl.getPartCols();
+      // Partition columns are appended at end, we only care about stats column
+      int numOfStatCols = isTblLevel ? fields.size() : fields.size() - partColSchema.size();
+      assert list != null;
+      for (int i = 0; i < numOfStatCols; i++) {
+        StructField structField = fields.get(i);
+        String columnName = colName.get(i);
+        String columnType = colType.get(i);
+        Object values = list.get(i);
+        try {
+          ColumnStatisticsObj statObj = ColumnStatisticsObjTranslator.readHiveStruct(columnName, columnType, structField, values);
+          statsObjs.add(statObj);
+        } catch (Exception e) {
+          if (isStatsReliable) {
+            throw new HiveException("Statistics collection failed while (hive.stats.reliable)", e);
+          } else {
+            LOG.debug("Because {} is infinite or NaN, we skip stats.", columnName, e);
+          }
+        }
+      }
+
+      if (!statsObjs.isEmpty()) {
+
+        if (!isTblLevel) {
+          List<String> partVals = new ArrayList<String>();
+          // Iterate over partition columns to figure out partition name
+          for (int i = fields.size() - partColSchema.size(); i < fields.size(); i++) {
+            Object partVal = ((PrimitiveObjectInspector) fields.get(i).getFieldObjectInspector()).getPrimitiveJavaObject(list.get(i));
+            partVals.add(partVal == null ? // could be null for default partition
+              this.conf.getVar(ConfVars.DEFAULTPARTITIONNAME) : partVal.toString());
+          }
+          partName = Warehouse.makePartName(partColSchema, partVals);
+        }
+
+        ColumnStatisticsDesc statsDesc = buildColumnStatsDesc(tbl, partName, isTblLevel);
+        ColumnStatistics colStats = new ColumnStatistics();
+        colStats.setStatsDesc(statsDesc);
+        colStats.setStatsObj(statsObjs);
+        stats.add(colStats);
+      }
+    }
+    ftOp.clearFetchContext();
+    return stats;
+  }
+
+  private ColumnStatisticsDesc buildColumnStatsDesc(Table table, String partName, boolean isTblLevel) {
+    String dbName = table.getDbName();
+    assert dbName != null;
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+    statsDesc.setDbName(dbName);
+    statsDesc.setTableName(table.getTableName());
+    statsDesc.setIsTblLevel(isTblLevel);
+
+    if (!isTblLevel) {
+      statsDesc.setPartName(partName);
+    } else {
+      statsDesc.setPartName(null);
+    }
+    return statsDesc;
+  }
+
+  public int persistColumnStats(Hive db, Table tbl) throws HiveException, MetaException, IOException {
+    // Construct a column statistics object from the result
+
+    List<ColumnStatistics> colStats = constructColumnStatsFromPackedRows(tbl);
+    // Persist the column statistics object to the metastore
+    // Note, this function is shared for both table and partition column stats.
+    if (colStats.isEmpty()) {
+      return 0;
+    }
+    SetPartitionsStatsRequest request = new SetPartitionsStatsRequest(colStats);
+    request.setNeedMerge(colStatDesc.isNeedMerge());
+    db.setPartitionColumnStatistics(request);
+    return 0;
+  }
+
+  @Override
+  public void setDpPartSpecs(Collection<Partition> dpPartSpecs) {
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/ColumnStatisticsObjTranslator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/ColumnStatisticsObjTranslator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColumnStatisticsObjTranslator.java
new file mode 100644
index 0000000..6485526
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColumnStatisticsObjTranslator.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.api.Decimal;
+import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
+import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector;
+import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector;
+import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
+import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+
+public class ColumnStatisticsObjTranslator {
+
+  public static ColumnStatisticsObj readHiveStruct(String columnName, String columnType, StructField structField, Object values)
+      throws HiveException
+  {
+    // Get the field objectInspector, fieldName and the field object.
+    ObjectInspector foi = structField.getFieldObjectInspector();
+    Object f = values;
+    String fieldName = structField.getFieldName();
+    ColumnStatisticsObj statsObj = new ColumnStatisticsObj();
+    statsObj.setColName(columnName);
+    statsObj.setColType(columnType);
+    try {
+      unpackStructObject(foi, f, fieldName, statsObj);
+      return statsObj;
+    } catch (Exception e) {
+      throw new HiveException("error calculating stats for column:" + structField.getFieldName(), e);
+    }
+  }
+
+  private static void unpackBooleanStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+    long v = ((LongObjectInspector) oi).get(o);
+    if (fName.equals("counttrues")) {
+      statsObj.getStatsData().getBooleanStats().setNumTrues(v);
+    } else if (fName.equals("countfalses")) {
+      statsObj.getStatsData().getBooleanStats().setNumFalses(v);
+    } else if (fName.equals("countnulls")) {
+      statsObj.getStatsData().getBooleanStats().setNumNulls(v);
+    }
+  }
+
+  @SuppressWarnings("serial")
+  static class UnsupportedDoubleException extends Exception {
+  }
+
+  private static void unpackDoubleStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
+    if (fName.equals("countnulls")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getDoubleStats().setNumNulls(v);
+    } else if (fName.equals("numdistinctvalues")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getDoubleStats().setNumDVs(v);
+    } else if (fName.equals("max")) {
+      double d = ((DoubleObjectInspector) oi).get(o);
+      if (Double.isInfinite(d) || Double.isNaN(d)) {
+        throw new UnsupportedDoubleException();
+      }
+      statsObj.getStatsData().getDoubleStats().setHighValue(d);
+    } else if (fName.equals("min")) {
+      double d = ((DoubleObjectInspector) oi).get(o);
+      if (Double.isInfinite(d) || Double.isNaN(d)) {
+        throw new UnsupportedDoubleException();
+      }
+      statsObj.getStatsData().getDoubleStats().setLowValue(d);
+    } else if (fName.equals("ndvbitvector")) {
+      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+      byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
+      statsObj.getStatsData().getDoubleStats().setBitVectors(buf);
+      ;
+    }
+  }
+
+  private static void unpackDecimalStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+    if (fName.equals("countnulls")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getDecimalStats().setNumNulls(v);
+    } else if (fName.equals("numdistinctvalues")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getDecimalStats().setNumDVs(v);
+    } else if (fName.equals("max")) {
+      HiveDecimal d = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
+      statsObj.getStatsData().getDecimalStats().setHighValue(convertToThriftDecimal(d));
+    } else if (fName.equals("min")) {
+      HiveDecimal d = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
+      statsObj.getStatsData().getDecimalStats().setLowValue(convertToThriftDecimal(d));
+    } else if (fName.equals("ndvbitvector")) {
+      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+      byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
+      statsObj.getStatsData().getDecimalStats().setBitVectors(buf);
+      ;
+    }
+  }
+
+  private static Decimal convertToThriftDecimal(HiveDecimal d) {
+    return new Decimal(ByteBuffer.wrap(d.unscaledValue().toByteArray()), (short) d.scale());
+  }
+
+  private static void unpackLongStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+    if (fName.equals("countnulls")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getLongStats().setNumNulls(v);
+    } else if (fName.equals("numdistinctvalues")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getLongStats().setNumDVs(v);
+    } else if (fName.equals("max")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getLongStats().setHighValue(v);
+    } else if (fName.equals("min")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getLongStats().setLowValue(v);
+    } else if (fName.equals("ndvbitvector")) {
+      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+      byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
+      statsObj.getStatsData().getLongStats().setBitVectors(buf);
+      ;
+    }
+  }
+
+  private static void unpackStringStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+    if (fName.equals("countnulls")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getStringStats().setNumNulls(v);
+    } else if (fName.equals("numdistinctvalues")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getStringStats().setNumDVs(v);
+    } else if (fName.equals("avglength")) {
+      double d = ((DoubleObjectInspector) oi).get(o);
+      statsObj.getStatsData().getStringStats().setAvgColLen(d);
+    } else if (fName.equals("maxlength")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getStringStats().setMaxColLen(v);
+    } else if (fName.equals("ndvbitvector")) {
+      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+      byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
+      statsObj.getStatsData().getStringStats().setBitVectors(buf);
+      ;
+    }
+  }
+
+  private static void unpackBinaryStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+    if (fName.equals("countnulls")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getBinaryStats().setNumNulls(v);
+    } else if (fName.equals("avglength")) {
+      double d = ((DoubleObjectInspector) oi).get(o);
+      statsObj.getStatsData().getBinaryStats().setAvgColLen(d);
+    } else if (fName.equals("maxlength")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getBinaryStats().setMaxColLen(v);
+    }
+  }
+
+  private static void unpackDateStats(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj statsObj) {
+    if (fName.equals("countnulls")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getDateStats().setNumNulls(v);
+    } else if (fName.equals("numdistinctvalues")) {
+      long v = ((LongObjectInspector) oi).get(o);
+      statsObj.getStatsData().getDateStats().setNumDVs(v);
+    } else if (fName.equals("max")) {
+      DateWritable v = ((DateObjectInspector) oi).getPrimitiveWritableObject(o);
+      statsObj.getStatsData().getDateStats().setHighValue(new Date(v.getDays()));
+    } else if (fName.equals("min")) {
+      DateWritable v = ((DateObjectInspector) oi).getPrimitiveWritableObject(o);
+      statsObj.getStatsData().getDateStats().setLowValue(new Date(v.getDays()));
+    } else if (fName.equals("ndvbitvector")) {
+      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+      byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
+      statsObj.getStatsData().getDateStats().setBitVectors(buf);
+      ;
+    }
+  }
+
+  private static void unpackPrimitiveObject(ObjectInspector oi, Object o, String fieldName, ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
+    if (o == null) {
+      return;
+    }
+    // First infer the type of object
+    if (fieldName.equals("columntype")) {
+      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+      String s = ((StringObjectInspector) poi).getPrimitiveJavaObject(o);
+      ColumnStatisticsData statsData = new ColumnStatisticsData();
+
+      if (s.equalsIgnoreCase("long")) {
+        LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector();
+        statsData.setLongStats(longStats);
+        statsObj.setStatsData(statsData);
+      } else if (s.equalsIgnoreCase("double")) {
+        DoubleColumnStatsDataInspector doubleStats = new DoubleColumnStatsDataInspector();
+        statsData.setDoubleStats(doubleStats);
+        statsObj.setStatsData(statsData);
+      } else if (s.equalsIgnoreCase("string")) {
+        StringColumnStatsDataInspector stringStats = new StringColumnStatsDataInspector();
+        statsData.setStringStats(stringStats);
+        statsObj.setStatsData(statsData);
+      } else if (s.equalsIgnoreCase("boolean")) {
+        BooleanColumnStatsData booleanStats = new BooleanColumnStatsData();
+        statsData.setBooleanStats(booleanStats);
+        statsObj.setStatsData(statsData);
+      } else if (s.equalsIgnoreCase("binary")) {
+        BinaryColumnStatsData binaryStats = new BinaryColumnStatsData();
+        statsData.setBinaryStats(binaryStats);
+        statsObj.setStatsData(statsData);
+      } else if (s.equalsIgnoreCase("decimal")) {
+        DecimalColumnStatsDataInspector decimalStats = new DecimalColumnStatsDataInspector();
+        statsData.setDecimalStats(decimalStats);
+        statsObj.setStatsData(statsData);
+      } else if (s.equalsIgnoreCase("date")) {
+        DateColumnStatsDataInspector dateStats = new DateColumnStatsDataInspector();
+        statsData.setDateStats(dateStats);
+        statsObj.setStatsData(statsData);
+      }
+    } else {
+      // invoke the right unpack method depending on data type of the column
+      if (statsObj.getStatsData().isSetBooleanStats()) {
+        unpackBooleanStats(oi, o, fieldName, statsObj);
+      } else if (statsObj.getStatsData().isSetLongStats()) {
+        unpackLongStats(oi, o, fieldName, statsObj);
+      } else if (statsObj.getStatsData().isSetDoubleStats()) {
+        unpackDoubleStats(oi, o, fieldName, statsObj);
+      } else if (statsObj.getStatsData().isSetStringStats()) {
+        unpackStringStats(oi, o, fieldName, statsObj);
+      } else if (statsObj.getStatsData().isSetBinaryStats()) {
+        unpackBinaryStats(oi, o, fieldName, statsObj);
+      } else if (statsObj.getStatsData().isSetDecimalStats()) {
+        unpackDecimalStats(oi, o, fieldName, statsObj);
+      } else if (statsObj.getStatsData().isSetDateStats()) {
+        unpackDateStats(oi, o, fieldName, statsObj);
+      }
+    }
+  }
+
+  private static void unpackStructObject(ObjectInspector oi, Object o, String fName, ColumnStatisticsObj cStatsObj) throws UnsupportedDoubleException {
+    if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
+      throw new RuntimeException("Invalid object datatype : " + oi.getCategory().toString());
+    }
+
+    StructObjectInspector soi = (StructObjectInspector) oi;
+    List<? extends StructField> fields = soi.getAllStructFieldRefs();
+    List<Object> list = soi.getStructFieldsDataAsList(o);
+
+    for (int i = 0; i < fields.size(); i++) {
+      // Get the field objectInspector, fieldName and the field object.
+      ObjectInspector foi = fields.get(i).getFieldObjectInspector();
+      Object f = (list == null ? null : list.get(i));
+      String fieldName = fields.get(i).getFieldName();
+
+      if (foi.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+        unpackPrimitiveObject(foi, f, fieldName, cStatsObj);
+      } else {
+        unpackStructObject(foi, f, fieldName, cStatsObj);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/IStatsProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/IStatsProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/IStatsProcessor.java
new file mode 100644
index 0000000..04219b5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/IStatsProcessor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.util.Collection;
+
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+public interface IStatsProcessor {
+
+  void initialize(CompilationOpContext opContext);
+
+  int process(Hive db, Table tbl) throws Exception;
+
+  void setDpPartSpecs(Collection<Partition> dpPartSpecs);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java
new file mode 100644
index 0000000..e8d3184
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/Partish.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+
+/**
+ * Cover class to make it easier to make modifications on partitions/tables
+ */
+public abstract class Partish {
+
+  public static Partish buildFor(Table table) {
+    return new PTable(table);
+  }
+
+  public static Partish buildFor(Partition part) {
+    return new PPart(part.getTable(), part);
+  }
+
+  public static Partish buildFor(Table table, Partition part) {
+    return new PPart(table, part);
+  }
+
+  // rename
+  @Deprecated
+  public final boolean isAcid() {
+    return AcidUtils.isFullAcidTable(getTable());
+  }
+
+  public abstract Table getTable();
+
+  public abstract Map<String, String> getPartParameters();
+
+  public abstract StorageDescriptor getPartSd();
+
+  public abstract Object getOutput() throws HiveException;
+
+  public abstract Partition getPartition();
+
+  public abstract Class<? extends InputFormat> getInputFormatClass() throws HiveException;
+
+  public abstract Class<? extends OutputFormat> getOutputFormatClass() throws HiveException;
+
+  public abstract String getLocation();
+
+  public abstract String getSimpleName();
+
+  public final String getPartishType() {
+    return getClass().getSimpleName();
+  }
+
+  static class PTable extends Partish {
+    private Table table;
+
+    public PTable(Table table) {
+      this.table = table;
+    }
+
+    @Override
+    public Table getTable() {
+      return table;
+    }
+
+    @Override
+    public Map<String, String> getPartParameters() {
+      return table.getTTable().getParameters();
+    }
+
+    @Override
+    public StorageDescriptor getPartSd() {
+      return table.getTTable().getSd();
+    }
+
+    @Override
+    public Object getOutput() throws HiveException {
+      return new Table(getTable().getTTable());
+    }
+
+    @Override
+    public Partition getPartition() {
+      return null;
+    }
+
+    @Override
+    public Class<? extends InputFormat> getInputFormatClass() {
+      return table.getInputFormatClass();
+    }
+
+    @Override
+    public Class<? extends OutputFormat> getOutputFormatClass() {
+      return table.getOutputFormatClass();
+    }
+
+    @Override
+    public String getLocation() {
+      return table.getDataLocation().toString();
+    }
+
+    @Override
+    public String getSimpleName() {
+      return String.format("Table %s.%s", table.getDbName(), table.getTableName());
+    }
+  }
+
+  static class PPart extends Partish {
+    private Table table;
+    private Partition partition;
+
+    // FIXME: possibly the distinction between table/partition is not need; however it was like this before....will change it later
+    public PPart(Table table, Partition partiton) {
+      this.table = table;
+      partition = partiton;
+    }
+
+    @Override
+    public Table getTable() {
+      return table;
+    }
+
+    @Override
+    public Map<String, String> getPartParameters() {
+      return partition.getTPartition().getParameters();
+    }
+
+    @Override
+    public StorageDescriptor getPartSd() {
+      return partition.getTPartition().getSd();
+    }
+
+    @Override
+    public Object getOutput() throws HiveException {
+      return new Partition(table, partition.getTPartition());
+    }
+
+    @Override
+    public Partition getPartition() {
+      return partition;
+    }
+
+    @Override
+    public Class<? extends InputFormat> getInputFormatClass() throws HiveException {
+      return partition.getInputFormatClass();
+    }
+
+    @Override
+    public Class<? extends OutputFormat> getOutputFormatClass() throws HiveException {
+      return partition.getOutputFormatClass();
+    }
+
+    @Override
+    public String getLocation() {
+      return partition.getLocation();
+    }
+
+    @Override
+    public String getSimpleName() {
+      return String.format("Partition %s.%s %s", table.getDbName(), table.getTableName(), partition.getSpec());
+    }
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 2c76f79..149a9ad 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -835,4 +835,4 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}};
     Assert.assertEquals(stringifyValues(expected), r);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 2faf098..b877253 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -132,6 +132,7 @@ public class TestTxnCommands2 {
         .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
             "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
     hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
 
     TxnDbUtil.setConfValues(hiveConf);
     TxnDbUtil.prepDb(hiveConf);

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 3e4f6f6..8737369 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -80,6 +80,7 @@ public abstract class TxnCommandsBaseForTests {
       .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
         "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
     hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
     TxnDbUtil.setConfValues(hiveConf);
     TxnDbUtil.prepDb(hiveConf);
     File f = new File(getWarehouseDir());

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHiveReduceExpressionsWithStatsRule.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHiveReduceExpressionsWithStatsRule.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHiveReduceExpressionsWithStatsRule.java
index d0a9982..4c865e0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHiveReduceExpressionsWithStatsRule.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/calcite/rules/TestHiveReduceExpressionsWithStatsRule.java
@@ -92,7 +92,7 @@ public class TestHiveReduceExpressionsWithStatsRule {
 
     builder = HiveRelFactories.HIVE_BUILDER.create(optCluster, schemaMock);
 
-    StatsSetupConst.setStatsStateForCreateTable(tableParams, Lists.newArrayList("_int"), "TRUE");
+    StatsSetupConst.setStatsStateForCreateTable(tableParams, Lists.newArrayList("_int"), StatsSetupConst.TRUE);
     tableParams.put(StatsSetupConst.ROW_COUNT, "3");
 
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/autoColumnStats_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_1.q b/ql/src/test/queries/clientpositive/autoColumnStats_1.q
index 7955b07..cc32393 100644
--- a/ql/src/test/queries/clientpositive/autoColumnStats_1.q
+++ b/ql/src/test/queries/clientpositive/autoColumnStats_1.q
@@ -60,6 +60,8 @@ drop table nzhang_part14;
 create table if not exists nzhang_part14 (key string)
   partitioned by (value string);
 
+desc formatted nzhang_part14;
+
 insert overwrite table nzhang_part14 partition(value) 
 select key, value from (
   select * from (select 'k1' as key, cast(null as string) as value from src limit 2)a 
@@ -69,6 +71,8 @@ select key, value from (
   select * from (select 'k3' as key, ' ' as value from src limit 2)c
 ) T;
 
+desc formatted nzhang_part14 partition (value=' ');
+
 explain select key from nzhang_part14;
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/autoColumnStats_10.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_10.q b/ql/src/test/queries/clientpositive/autoColumnStats_10.q
new file mode 100644
index 0000000..bf166d8
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/autoColumnStats_10.q
@@ -0,0 +1,52 @@
+set hive.mapred.mode=nonstrict;
+set hive.stats.column.autogather=true;
+
+drop table p;
+
+CREATE TABLE p(insert_num int, c1 tinyint, c2 smallint);
+
+desc formatted p;
+
+insert into p values (1,22,333);
+
+desc formatted p;
+
+alter table p replace columns (insert_num int, c1 STRING, c2 STRING);
+
+desc formatted p;
+
+desc formatted p insert_num;
+desc formatted p c1;
+
+insert into p values (2,11,111);
+
+desc formatted p;
+
+desc formatted p insert_num;
+desc formatted p c1;
+
+set hive.stats.column.autogather=false;
+
+drop table p;
+
+CREATE TABLE p(insert_num int, c1 tinyint, c2 smallint);
+
+desc formatted p;
+
+insert into p values (1,22,333);
+
+desc formatted p;
+
+alter table p replace columns (insert_num int, c1 STRING, c2 STRING);
+
+desc formatted p;
+
+desc formatted p insert_num;
+desc formatted p c1;
+
+insert into p values (2,11,111);
+
+desc formatted p;
+
+desc formatted p insert_num;
+desc formatted p c1;

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/autoColumnStats_5a.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/autoColumnStats_5a.q b/ql/src/test/queries/clientpositive/autoColumnStats_5a.q
new file mode 100644
index 0000000..a8bce18
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/autoColumnStats_5a.q
@@ -0,0 +1,30 @@
+set hive.stats.column.autogather=true;
+set hive.mapred.mode=nonstrict;
+set hive.cli.print.header=true;
+SET hive.exec.schema.evolution=true;
+SET hive.vectorized.execution.enabled=false;
+set hive.fetch.task.conversion=none;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+-- SORT_QUERY_RESULTS
+
+CREATE TABLE partitioned1(a INT, b STRING) PARTITIONED BY(part INT) STORED AS TEXTFILE;
+
+explain extended 
+insert into table partitioned1 partition(part=1) values(1, 'original');
+
+insert into table partitioned1 partition(part=1) values(1, 'original');
+
+desc formatted partitioned1 partition(part=1);
+
+explain extended 
+insert into table partitioned1 partition(part=1) values(2, 'original'), (3, 'original'),(4, 'original');
+
+insert into table partitioned1 partition(part=1) values(2, 'original'), (3, 'original'),(4, 'original');
+
+explain insert into table partitioned1 partition(part=1) values(1, 'original'),(2, 'original'), (3, 'original'),(4, 'original');
+
+desc formatted partitioned1;
+desc formatted partitioned1 partition(part=1);
+desc formatted partitioned1 partition(part=1) a;
+

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/basicstat_partval.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/basicstat_partval.q b/ql/src/test/queries/clientpositive/basicstat_partval.q
new file mode 100644
index 0000000..2db472d
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/basicstat_partval.q
@@ -0,0 +1,12 @@
+set hive.stats.autogather=true;
+
+CREATE TABLE p1(i int) partitioned by (p string);
+
+insert into p1 partition(p='a') values (1);
+insert into p1 partition(p='A') values (2),(3);
+
+describe formatted p1;
+describe formatted p1 partition(p='a');
+describe formatted p1 partition(p='A');
+
+

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/columnstats_partlvl.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/columnstats_partlvl.q b/ql/src/test/queries/clientpositive/columnstats_partlvl.q
index 2c92dfe..4283bca 100644
--- a/ql/src/test/queries/clientpositive/columnstats_partlvl.q
+++ b/ql/src/test/queries/clientpositive/columnstats_partlvl.q
@@ -14,6 +14,8 @@ explain extended
 analyze table Employee_Part partition (employeeSalary=2000.0) compute statistics for columns employeeID;
 analyze table Employee_Part partition (employeeSalary=2000.0) compute statistics for columns employeeID;
 
+describe formatted Employee_Part partition(employeeSalary=2000.0);
+
 explain 
 analyze table Employee_Part partition (employeeSalary=4000.0) compute statistics for columns employeeID;
 explain extended

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/columnstats_partlvl_dp.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/columnstats_partlvl_dp.q b/ql/src/test/queries/clientpositive/columnstats_partlvl_dp.q
index ead9a2d..c065edd 100644
--- a/ql/src/test/queries/clientpositive/columnstats_partlvl_dp.q
+++ b/ql/src/test/queries/clientpositive/columnstats_partlvl_dp.q
@@ -16,6 +16,8 @@ explain
 analyze table Employee_Part partition (employeeSalary='4000.0', country) compute statistics for columns employeeName, employeeID;
 analyze table Employee_Part partition (employeeSalary='4000.0', country) compute statistics for columns employeeName, employeeID;
 
+describe formatted Employee_Part partition (employeeSalary='4000.0', country='USA');
+
 describe formatted Employee_Part partition (employeeSalary='4000.0', country='USA') employeeName;
 
 -- don't specify all partitioning keys

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/deleteAnalyze.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/deleteAnalyze.q b/ql/src/test/queries/clientpositive/deleteAnalyze.q
index 26123a6..5293ddf 100644
--- a/ql/src/test/queries/clientpositive/deleteAnalyze.q
+++ b/ql/src/test/queries/clientpositive/deleteAnalyze.q
@@ -20,6 +20,8 @@ describe formatted testdeci2 amount;
 
 analyze table testdeci2 compute statistics for columns;
 
+describe formatted testdeci2;
+
 set hive.stats.fetch.column.stats=true;
 
 analyze table testdeci2 compute statistics for columns;

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/exec_parallel_column_stats.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/exec_parallel_column_stats.q b/ql/src/test/queries/clientpositive/exec_parallel_column_stats.q
index ceacc24..a89b707 100644
--- a/ql/src/test/queries/clientpositive/exec_parallel_column_stats.q
+++ b/ql/src/test/queries/clientpositive/exec_parallel_column_stats.q
@@ -1,5 +1,7 @@
 set hive.exec.parallel=true;
 
-explain analyze table src compute statistics for columns;
+create table t as select * from src;
 
-analyze table src compute statistics for columns;
\ No newline at end of file
+explain analyze table t compute statistics for columns;
+
+analyze table t compute statistics for columns;

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/outer_reference_windowed.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/outer_reference_windowed.q b/ql/src/test/queries/clientpositive/outer_reference_windowed.q
index cac6b41..3259ebe 100644
--- a/ql/src/test/queries/clientpositive/outer_reference_windowed.q
+++ b/ql/src/test/queries/clientpositive/outer_reference_windowed.q
@@ -34,6 +34,8 @@ ANALYZE TABLE e011_03 COMPUTE STATISTICS FOR COLUMNS;
 
 set hive.explain.user=false;
 
+describe formatted e011_01;
+
 explain select sum(sum(c1)) over() from e011_01;
 select sum(sum(c1)) over() from e011_01;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/test/queries/clientpositive/smb_mapjoin_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/smb_mapjoin_1.q b/ql/src/test/queries/clientpositive/smb_mapjoin_1.q
index baf1690..b2394ad 100644
--- a/ql/src/test/queries/clientpositive/smb_mapjoin_1.q
+++ b/ql/src/test/queries/clientpositive/smb_mapjoin_1.q
@@ -12,6 +12,9 @@ load data local inpath '../../data/files/smbbucket_1.rc' overwrite into table sm
 load data local inpath '../../data/files/smbbucket_2.rc' overwrite into table smb_bucket_2;
 load data local inpath '../../data/files/smbbucket_3.rc' overwrite into table smb_bucket_3;
 
+desc formatted smb_bucket_1;
+select count(*) from smb_bucket_1;
+
 set hive.cbo.enable=false;
 set hive.optimize.bucketmapjoin = true;
 set hive.optimize.bucketmapjoin.sortedmerge = true;