You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/11/07 22:33:58 UTC

[21/22] hive git commit: HIVE-16827 : Merge stats task and column stats task into a single task (Zoltan Haindrich via Ashutosh Chauhan)

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
deleted file mode 100644
index 1f28688..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.Date;
-import org.apache.hadoop.hive.metastore.api.Decimal;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
-import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
-import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector;
-import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector;
-import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
-import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
-import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ColumnStatsTask implementation.
- **/
-
-public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializable {
-  private static final long serialVersionUID = 1L;
-  private FetchOperator ftOp;
-  private static transient final Logger LOG = LoggerFactory.getLogger(ColumnStatsTask.class);
-
-  public ColumnStatsTask() {
-    super();
-  }
-
-  @Override
-  public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx,
-      CompilationOpContext opContext) {
-    super.initialize(queryState, queryPlan, ctx, opContext);
-    work.initializeForFetch(opContext);
-    try {
-      JobConf job = new JobConf(conf);
-      ftOp = new FetchOperator(work.getfWork(), job);
-    } catch (Exception e) {
-      LOG.error(StringUtils.stringifyException(e));
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void unpackBooleanStats(ObjectInspector oi, Object o, String fName,
-      ColumnStatisticsObj statsObj) {
-    long v = ((LongObjectInspector) oi).get(o);
-    if (fName.equals("counttrues")) {
-      statsObj.getStatsData().getBooleanStats().setNumTrues(v);
-    } else if (fName.equals("countfalses")) {
-      statsObj.getStatsData().getBooleanStats().setNumFalses(v);
-    } else if (fName.equals("countnulls")) {
-      statsObj.getStatsData().getBooleanStats().setNumNulls(v);
-    }
-  }
-
-  @SuppressWarnings("serial")
-  class UnsupportedDoubleException extends Exception {
-  }
-
-  private void unpackDoubleStats(ObjectInspector oi, Object o, String fName,
-      ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
-    if (fName.equals("countnulls")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getDoubleStats().setNumNulls(v);
-    } else if (fName.equals("numdistinctvalues")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getDoubleStats().setNumDVs(v);
-    } else if (fName.equals("max")) {
-      double d = ((DoubleObjectInspector) oi).get(o);
-      if (Double.isInfinite(d) || Double.isNaN(d)) {
-        throw new UnsupportedDoubleException();
-      }
-      statsObj.getStatsData().getDoubleStats().setHighValue(d);
-    } else if (fName.equals("min")) {
-      double d = ((DoubleObjectInspector) oi).get(o);
-      if (Double.isInfinite(d) || Double.isNaN(d)) {
-        throw new UnsupportedDoubleException();
-      }
-      statsObj.getStatsData().getDoubleStats().setLowValue(d);
-    } else if (fName.equals("ndvbitvector")) {
-      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
-      byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
-      statsObj.getStatsData().getDoubleStats().setBitVectors(buf);
-    }
-  }
-
-  private void unpackDecimalStats(ObjectInspector oi, Object o, String fName,
-      ColumnStatisticsObj statsObj) {
-    if (fName.equals("countnulls")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getDecimalStats().setNumNulls(v);
-    } else if (fName.equals("numdistinctvalues")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getDecimalStats().setNumDVs(v);
-    } else if (fName.equals("max")) {
-      HiveDecimal d = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
-      statsObj.getStatsData().getDecimalStats().setHighValue(convertToThriftDecimal(d));
-    } else if (fName.equals("min")) {
-      HiveDecimal d = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
-      statsObj.getStatsData().getDecimalStats().setLowValue(convertToThriftDecimal(d));
-    } else if (fName.equals("ndvbitvector")) {
-      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
-      byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
-      statsObj.getStatsData().getDecimalStats().setBitVectors(buf);
-    }
-  }
-
-  private Decimal convertToThriftDecimal(HiveDecimal d) {
-    return new Decimal(ByteBuffer.wrap(d.unscaledValue().toByteArray()), (short)d.scale());
-  }
-
-  private void unpackLongStats(ObjectInspector oi, Object o, String fName,
-      ColumnStatisticsObj statsObj) {
-    if (fName.equals("countnulls")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getLongStats().setNumNulls(v);
-    } else if (fName.equals("numdistinctvalues")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getLongStats().setNumDVs(v);
-    } else if (fName.equals("max")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getLongStats().setHighValue(v);
-    } else if (fName.equals("min")) {
-      long  v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getLongStats().setLowValue(v);
-    } else if (fName.equals("ndvbitvector")) {
-      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
-      byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
-      statsObj.getStatsData().getLongStats().setBitVectors(buf);
-    }
-  }
-
-  private void unpackStringStats(ObjectInspector oi, Object o, String fName,
-      ColumnStatisticsObj statsObj) {
-    if (fName.equals("countnulls")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getStringStats().setNumNulls(v);
-    } else if (fName.equals("numdistinctvalues")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getStringStats().setNumDVs(v);
-    } else if (fName.equals("avglength")) {
-      double d = ((DoubleObjectInspector) oi).get(o);
-      statsObj.getStatsData().getStringStats().setAvgColLen(d);
-    } else if (fName.equals("maxlength")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getStringStats().setMaxColLen(v);
-    } else if (fName.equals("ndvbitvector")) {
-      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
-      byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
-      statsObj.getStatsData().getStringStats().setBitVectors(buf);
-    }
-  }
-
-  private void unpackBinaryStats(ObjectInspector oi, Object o, String fName,
-      ColumnStatisticsObj statsObj) {
-    if (fName.equals("countnulls")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getBinaryStats().setNumNulls(v);
-    } else if (fName.equals("avglength")) {
-      double d = ((DoubleObjectInspector) oi).get(o);
-      statsObj.getStatsData().getBinaryStats().setAvgColLen(d);
-    } else if (fName.equals("maxlength")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getBinaryStats().setMaxColLen(v);
-    }
-  }
-
-  private void unpackDateStats(ObjectInspector oi, Object o, String fName,
-      ColumnStatisticsObj statsObj) {
-    if (fName.equals("countnulls")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getDateStats().setNumNulls(v);
-    } else if (fName.equals("numdistinctvalues")) {
-      long v = ((LongObjectInspector) oi).get(o);
-      statsObj.getStatsData().getDateStats().setNumDVs(v);
-    } else if (fName.equals("max")) {
-      DateWritable v = ((DateObjectInspector) oi).getPrimitiveWritableObject(o);
-      statsObj.getStatsData().getDateStats().setHighValue(new Date(v.getDays()));
-    } else if (fName.equals("min")) {
-      DateWritable v = ((DateObjectInspector) oi).getPrimitiveWritableObject(o);
-      statsObj.getStatsData().getDateStats().setLowValue(new Date(v.getDays()));
-    } else if (fName.equals("ndvbitvector")) {
-      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
-      byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
-      statsObj.getStatsData().getDateStats().setBitVectors(buf);
-    }
-  }
-
-  private void unpackPrimitiveObject (ObjectInspector oi, Object o, String fieldName,
-      ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
-    if (o == null) {
-      return;
-    }
-    // First infer the type of object
-    if (fieldName.equals("columntype")) {
-      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
-      String s = ((StringObjectInspector) poi).getPrimitiveJavaObject(o);
-      ColumnStatisticsData statsData = new ColumnStatisticsData();
-
-      if (s.equalsIgnoreCase("long")) {
-        LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector();
-        statsData.setLongStats(longStats);
-        statsObj.setStatsData(statsData);
-      } else if (s.equalsIgnoreCase("double")) {
-        DoubleColumnStatsDataInspector doubleStats = new DoubleColumnStatsDataInspector();
-        statsData.setDoubleStats(doubleStats);
-        statsObj.setStatsData(statsData);
-      } else if (s.equalsIgnoreCase("string")) {
-        StringColumnStatsDataInspector stringStats = new StringColumnStatsDataInspector();
-        statsData.setStringStats(stringStats);
-        statsObj.setStatsData(statsData);
-      } else if (s.equalsIgnoreCase("boolean")) {
-        BooleanColumnStatsData booleanStats = new BooleanColumnStatsData();
-        statsData.setBooleanStats(booleanStats);
-        statsObj.setStatsData(statsData);
-      } else if (s.equalsIgnoreCase("binary")) {
-        BinaryColumnStatsData binaryStats = new BinaryColumnStatsData();
-        statsData.setBinaryStats(binaryStats);
-        statsObj.setStatsData(statsData);
-      } else if (s.equalsIgnoreCase("decimal")) {
-        DecimalColumnStatsDataInspector decimalStats = new DecimalColumnStatsDataInspector();
-        statsData.setDecimalStats(decimalStats);
-        statsObj.setStatsData(statsData);
-      } else if (s.equalsIgnoreCase("date")) {
-        DateColumnStatsDataInspector dateStats = new DateColumnStatsDataInspector();
-        statsData.setDateStats(dateStats);
-        statsObj.setStatsData(statsData);
-      }
-    } else {
-      // invoke the right unpack method depending on data type of the column
-      if (statsObj.getStatsData().isSetBooleanStats()) {
-        unpackBooleanStats(oi, o, fieldName, statsObj);
-      } else if (statsObj.getStatsData().isSetLongStats()) {
-        unpackLongStats(oi, o, fieldName, statsObj);
-      } else if (statsObj.getStatsData().isSetDoubleStats()) {
-        unpackDoubleStats(oi,o,fieldName, statsObj);
-      } else if (statsObj.getStatsData().isSetStringStats()) {
-        unpackStringStats(oi, o, fieldName, statsObj);
-      } else if (statsObj.getStatsData().isSetBinaryStats()) {
-        unpackBinaryStats(oi, o, fieldName, statsObj);
-      } else if (statsObj.getStatsData().isSetDecimalStats()) {
-        unpackDecimalStats(oi, o, fieldName, statsObj);
-      } else if (statsObj.getStatsData().isSetDateStats()) {
-        unpackDateStats(oi, o, fieldName, statsObj);
-      }
-    }
-  }
-
-  private void unpackStructObject(ObjectInspector oi, Object o, String fName,
-      ColumnStatisticsObj cStatsObj) throws UnsupportedDoubleException {
-    if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
-      throw new RuntimeException("Invalid object datatype : " + oi.getCategory().toString());
-    }
-
-    StructObjectInspector soi = (StructObjectInspector) oi;
-    List<? extends StructField> fields = soi.getAllStructFieldRefs();
-    List<Object> list = soi.getStructFieldsDataAsList(o);
-
-    for (int i = 0; i < fields.size(); i++) {
-      // Get the field objectInspector, fieldName and the field object.
-      ObjectInspector foi = fields.get(i).getFieldObjectInspector();
-      Object f = (list == null ? null : list.get(i));
-      String fieldName = fields.get(i).getFieldName();
-
-      if (foi.getCategory() == ObjectInspector.Category.PRIMITIVE) {
-        unpackPrimitiveObject(foi, f, fieldName, cStatsObj);
-      } else {
-        unpackStructObject(foi, f, fieldName, cStatsObj);
-      }
-    }
-  }
-
-  private List<ColumnStatistics> constructColumnStatsFromPackedRows(
-      Hive db) throws HiveException, MetaException, IOException {
-
-    String currentDb = work.getCurrentDatabaseName();
-    String tableName = work.getColStats().getTableName();
-    String partName = null;
-    List<String> colName = work.getColStats().getColName();
-    List<String> colType = work.getColStats().getColType();
-    boolean isTblLevel = work.getColStats().isTblLevel();
-
-    List<ColumnStatistics> stats = new ArrayList<ColumnStatistics>();
-    InspectableObject packedRow;
-    Table tbl = db.getTable(currentDb, tableName);
-    while ((packedRow = ftOp.getNextRow()) != null) {
-      if (packedRow.oi.getCategory() != ObjectInspector.Category.STRUCT) {
-        throw new HiveException("Unexpected object type encountered while unpacking row");
-      }
-
-      List<ColumnStatisticsObj> statsObjs = new ArrayList<ColumnStatisticsObj>();
-      StructObjectInspector soi = (StructObjectInspector) packedRow.oi;
-      List<? extends StructField> fields = soi.getAllStructFieldRefs();
-      List<Object> list = soi.getStructFieldsDataAsList(packedRow.o);
-
-      List<FieldSchema> partColSchema = tbl.getPartCols();
-      // Partition columns are appended at end, we only care about stats column
-      int numOfStatCols = isTblLevel ? fields.size() : fields.size() - partColSchema.size();
-      for (int i = 0; i < numOfStatCols; i++) {
-        // Get the field objectInspector, fieldName and the field object.
-        ObjectInspector foi = fields.get(i).getFieldObjectInspector();
-        Object f = (list == null ? null : list.get(i));
-        String fieldName = fields.get(i).getFieldName();
-        ColumnStatisticsObj statsObj = new ColumnStatisticsObj();
-        statsObj.setColName(colName.get(i));
-        statsObj.setColType(colType.get(i));
-        try {
-          unpackStructObject(foi, f, fieldName, statsObj);
-          statsObjs.add(statsObj);
-        } catch (UnsupportedDoubleException e) {
-          // due to infinity or nan.
-          LOG.info("Because {} is infinite or NaN, we skip stats.",  colName.get(i));
-        }
-      }
-
-      if (!isTblLevel) {
-        List<String> partVals = new ArrayList<String>();
-        // Iterate over partition columns to figure out partition name
-        for (int i = fields.size() - partColSchema.size(); i < fields.size(); i++) {
-          Object partVal = ((PrimitiveObjectInspector)fields.get(i).getFieldObjectInspector()).
-              getPrimitiveJavaObject(list.get(i));
-          partVals.add(partVal == null ? // could be null for default partition
-            this.conf.getVar(ConfVars.DEFAULTPARTITIONNAME) : partVal.toString());
-        }
-        partName = Warehouse.makePartName(partColSchema, partVals);
-      }
-      String [] names = Utilities.getDbTableName(currentDb, tableName);
-      ColumnStatisticsDesc statsDesc = getColumnStatsDesc(names[0], names[1], partName, isTblLevel);
-      ColumnStatistics colStats = new ColumnStatistics();
-      colStats.setStatsDesc(statsDesc);
-      colStats.setStatsObj(statsObjs);
-      if (!statsObjs.isEmpty()) {
-        stats.add(colStats);
-      }
-    }
-    ftOp.clearFetchContext();
-    return stats;
-  }
-
-  private ColumnStatisticsDesc getColumnStatsDesc(String dbName, String tableName,
-      String partName, boolean isTblLevel)
-  {
-    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
-    statsDesc.setDbName(dbName);
-    statsDesc.setTableName(tableName);
-    statsDesc.setIsTblLevel(isTblLevel);
-
-    if (!isTblLevel) {
-      statsDesc.setPartName(partName);
-    } else {
-      statsDesc.setPartName(null);
-    }
-    return statsDesc;
-  }
-
-  private int persistColumnStats(Hive db) throws HiveException, MetaException, IOException {
-    // Construct a column statistics object from the result
-    List<ColumnStatistics> colStats = constructColumnStatsFromPackedRows(db);
-    // Persist the column statistics object to the metastore
-    // Note, this function is shared for both table and partition column stats.
-    if (colStats.isEmpty()) {
-      return 0;
-    }
-    SetPartitionsStatsRequest request = new SetPartitionsStatsRequest(colStats);
-    if (work.getColStats() != null && work.getColStats().getNumBitVector() > 0) {
-      request.setNeedMerge(true);
-    }
-    db.setPartitionColumnStatistics(request);
-    return 0;
-  }
-
-  @Override
-  public int execute(DriverContext driverContext) {
-    if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) {
-      return 0;
-    }
-    try {
-      Hive db = getHive();
-      return persistColumnStats(db);
-    } catch (Exception e) {
-      LOG.error("Failed to run column stats task", e);
-    }
-    return 1;
-  }
-
-  @Override
-  public StageType getType() {
-    return StageType.COLUMNSTATS;
-  }
-
-  @Override
-  public String getName() {
-    return "COLUMNSTATS TASK";
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 2331498..b4989f1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePartitioner;
-import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.io.StreamingOutputFormat;
@@ -975,7 +974,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField);
         int bucketProperty =
             bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField));
-        int bucketNum = 
+        int bucketNum =
           BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
         writerOffset = 0;
         if (multiFileSpray) {
@@ -1452,7 +1451,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     }
 
     StatsCollectionContext sContext = new StatsCollectionContext(hconf);
-    sContext.setStatsTmpDir(conf.getStatsTmpDir());
+    sContext.setStatsTmpDir(conf.getTmpStatsDir());
     if (!statsPublisher.connect(sContext)) {
       // just return, stats gathering should not block the main query
       LOG.error("StatsPublishing error: cannot connect to database");

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
deleted file mode 100644
index c333c49..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
+++ /dev/null
@@ -1,396 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.HiveStatsUtils;
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
-import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.ReflectionUtil;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.MapMaker;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * StatsNoJobTask is used in cases where stats collection is the only task for the given query (no
- * parent MR or Tez job). It is used in the following cases 1) ANALYZE with noscan for
- * file formats that implement StatsProvidingRecordReader interface: ORC format (implements
- * StatsProvidingRecordReader) stores column statistics for all columns in the file footer. Its much
- * faster to compute the table/partition statistics by reading the footer than scanning all the
- * rows. This task can be used for computing basic stats like numFiles, numRows, fileSize,
- * rawDataSize from ORC footer.
- **/
-public class StatsNoJobTask extends Task<StatsNoJobWork> implements Serializable {
-
-  private static final long serialVersionUID = 1L;
-  private static transient final Logger LOG = LoggerFactory.getLogger(StatsNoJobTask.class);
-  private ConcurrentMap<String, Partition> partUpdates;
-  private Table table;
-  private String tableFullName;
-  private JobConf jc = null;
-
-  public StatsNoJobTask() {
-    super();
-  }
-
-  @Override
-  public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext,
-      CompilationOpContext opContext) {
-    super.initialize(queryState, queryPlan, driverContext, opContext);
-    jc = new JobConf(conf);
-  }
-
-  @Override
-  public int execute(DriverContext driverContext) {
-
-    LOG.info("Executing stats (no job) task");
-
-    String tableName = "";
-    ExecutorService threadPool = null;
-    Hive db = getHive();
-    try {
-      tableName = work.getTableSpecs().tableName;
-      table = db.getTable(tableName);
-      int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_STATS_GATHER_NUM_THREADS);
-      tableFullName = table.getFullyQualifiedName();
-      threadPool = Executors.newFixedThreadPool(numThreads,
-          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("StatsNoJobTask-Thread-%d")
-              .build());
-      partUpdates = new MapMaker().concurrencyLevel(numThreads).makeMap();
-      LOG.info("Initialized threadpool for stats computation with {} threads", numThreads);
-    } catch (HiveException e) {
-      LOG.error("Cannot get table {}", tableName, e);
-      console.printError("Cannot get table " + tableName, e.toString());
-    }
-
-    return aggregateStats(threadPool, db);
-  }
-
-  @Override
-  public StageType getType() {
-    return StageType.STATS;
-  }
-
-  @Override
-  public String getName() {
-    return "STATS-NO-JOB";
-  }
-
-  class StatsCollection implements Runnable {
-
-    private final Partition partn;
-
-    public StatsCollection(Partition part) {
-      this.partn = part;
-    }
-
-    @Override
-    public void run() {
-
-      // get the list of partitions
-      org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
-      Map<String, String> parameters = tPart.getParameters();
-
-      try {
-        Path dir = new Path(tPart.getSd().getLocation());
-        long numRows = 0;
-        long rawDataSize = 0;
-        long fileSize = 0;
-        long numFiles = 0;
-        FileSystem fs = dir.getFileSystem(conf);
-        FileStatus[] fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs);
-
-        boolean statsAvailable = false;
-        for(FileStatus file: fileList) {
-          if (!file.isDir()) {
-            InputFormat<?, ?> inputFormat = ReflectionUtil.newInstance(
-                partn.getInputFormatClass(), jc);
-            InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0,
-                new String[] { partn.getLocation() });
-            org.apache.hadoop.mapred.RecordReader<?, ?> recordReader =
-                inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
-            StatsProvidingRecordReader statsRR;
-            if (recordReader instanceof StatsProvidingRecordReader) {
-              statsRR = (StatsProvidingRecordReader) recordReader;
-              rawDataSize += statsRR.getStats().getRawDataSize();
-              numRows += statsRR.getStats().getRowCount();
-              fileSize += file.getLen();
-              numFiles += 1;
-              statsAvailable = true;
-            }
-            recordReader.close();
-          }
-        }
-
-        if (statsAvailable) {
-          parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
-          parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(rawDataSize));
-          parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(fileSize));
-          parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(numFiles));
-
-          partUpdates.put(tPart.getSd().getLocation(), new Partition(table, tPart));
-
-          // printout console and debug logs
-          String threadName = Thread.currentThread().getName();
-          String msg = "Partition " + tableFullName + partn.getSpec() + " stats: ["
-              + toString(parameters) + ']';
-          LOG.debug("{}: {}", threadName, msg);
-          console.printInfo(msg);
-        } else {
-          String threadName = Thread.currentThread().getName();
-          String msg = "Partition " + tableFullName + partn.getSpec() + " does not provide stats.";
-          LOG.debug("{}: {}", threadName, msg);
-        }
-      } catch (Exception e) {
-        console.printInfo("[Warning] could not update stats for " + tableFullName + partn.getSpec()
-            + ".",
-            "Failed with exception " + e.getMessage() + "\n" + StringUtils.stringifyException(e));
-
-        // Before updating the partition params, if any partition params is null
-        // and if statsReliable is true then updatePartition() function  will fail
-        // the task by returning 1
-        if (work.isStatsReliable()) {
-          partUpdates.put(tPart.getSd().getLocation(), null);
-        }
-      }
-    }
-
-    private String toString(Map<String, String> parameters) {
-      StringBuilder builder = new StringBuilder();
-      for (String statType : StatsSetupConst.supportedStats) {
-        String value = parameters.get(statType);
-        if (value != null) {
-          if (builder.length() > 0) {
-            builder.append(", ");
-          }
-          builder.append(statType).append('=').append(value);
-        }
-      }
-      return builder.toString();
-    }
-
-  }
-
-  private int aggregateStats(ExecutorService threadPool, Hive db) {
-    int ret = 0;
-
-    try {
-      Collection<Partition> partitions = null;
-      if (work.getPrunedPartitionList() == null) {
-        partitions = getPartitionsList();
-      } else {
-        partitions = work.getPrunedPartitionList().getPartitions();
-      }
-
-      // non-partitioned table
-      if (partitions == null) {
-        org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
-        Map<String, String> parameters = tTable.getParameters();
-        try {
-          Path dir = new Path(tTable.getSd().getLocation());
-          LOG.debug("Aggregating stats for " + dir);
-          long numRows = 0;
-          long rawDataSize = 0;
-          long fileSize = 0;
-          long numFiles = 0;
-          FileSystem fs = dir.getFileSystem(conf);
-          FileStatus[] fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs);
-
-          boolean statsAvailable = false;
-          for(FileStatus file: fileList) {
-            LOG.debug("Computing stats for " + file);
-            if (!file.isDir()) {
-              InputFormat<?, ?> inputFormat = ReflectionUtil.newInstance(
-                  table.getInputFormatClass(), jc);
-              InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0, new String[] { table
-                  .getDataLocation().toString() });
-              if (file.getLen() == 0) {
-                numFiles += 1;
-                statsAvailable = true;
-              } else {
-                org.apache.hadoop.mapred.RecordReader<?, ?> recordReader =
-                    inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
-                StatsProvidingRecordReader statsRR;
-                if (recordReader instanceof StatsProvidingRecordReader) {
-                  statsRR = (StatsProvidingRecordReader) recordReader;
-                  numRows += statsRR.getStats().getRowCount();
-                  rawDataSize += statsRR.getStats().getRawDataSize();
-                  fileSize += file.getLen();
-                  numFiles += 1;
-                  statsAvailable = true;
-                }
-                recordReader.close();
-              }
-            }
-          }
-
-          if (statsAvailable) {
-            parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
-            parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(rawDataSize));
-            parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(fileSize));
-            parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(numFiles));
-            EnvironmentContext environmentContext = new EnvironmentContext();
-            environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
-
-            db.alterTable(table, environmentContext);
-
-            String msg = "Table " + tableFullName + " stats: [" + toString(parameters) + ']';
-            if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-              Utilities.FILE_OP_LOGGER.trace(msg);
-            }
-            console.printInfo(msg);
-            LOG.debug("Table {} does not provide stats.", tableFullName);
-          }
-        } catch (Exception e) {
-          console.printInfo("[Warning] could not update stats for " + tableFullName + ".",
-              "Failed with exception " + e.getMessage() + "\n" + StringUtils.stringifyException(e));
-        }
-      } else {
-
-        // Partitioned table
-        for (Partition partn : partitions) {
-          threadPool.execute(new StatsCollection(partn));
-        }
-
-        LOG.debug("Stats collection waiting for threadpool to shutdown..");
-        shutdownAndAwaitTermination(threadPool);
-        LOG.debug("Stats collection threadpool shutdown successful.");
-
-        ret = updatePartitions(db);
-      }
-
-    } catch (Exception e) {
-      // Fail the query if the stats are supposed to be reliable
-      if (work.isStatsReliable()) {
-        ret = -1;
-      }
-    }
-
-    // The return value of 0 indicates success,
-    // anything else indicates failure
-    return ret;
-  }
-
-  private int updatePartitions(Hive db) throws InvalidOperationException, HiveException {
-    if (!partUpdates.isEmpty()) {
-      List<Partition> updatedParts = Lists.newArrayList(partUpdates.values());
-      if (updatedParts.contains(null) && work.isStatsReliable()) {
-        LOG.debug("Stats requested to be reliable. Empty stats found and hence failing the task.");
-        return -1;
-      } else {
-        LOG.debug("Bulk updating partitions..");
-        EnvironmentContext environmentContext = new EnvironmentContext();
-        environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
-        db.alterPartitions(tableFullName, Lists.newArrayList(partUpdates.values()),
-            environmentContext);
-        LOG.debug("Bulk updated {} partitions.", partUpdates.values().size());
-      }
-    }
-    return 0;
-  }
-
-  private void shutdownAndAwaitTermination(ExecutorService threadPool) {
-
-    // Disable new tasks from being submitted
-    threadPool.shutdown();
-    try {
-
-      // Wait a while for existing tasks to terminate
-      while (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
-        LOG.debug("Waiting for all stats tasks to finish...");
-      }
-      // Cancel currently executing tasks
-      threadPool.shutdownNow();
-
-      // Wait a while for tasks to respond to being cancelled
-      if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) {
-        LOG.debug("Stats collection thread pool did not terminate");
-      }
-    } catch (InterruptedException ie) {
-
-      // Cancel again if current thread also interrupted
-      threadPool.shutdownNow();
-
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  private String toString(Map<String, String> parameters) {
-    StringBuilder builder = new StringBuilder();
-    for (String statType : StatsSetupConst.supportedStats) {
-      String value = parameters.get(statType);
-      if (value != null) {
-        if (builder.length() > 0) {
-          builder.append(", ");
-        }
-        builder.append(statType).append('=').append(value);
-      }
-    }
-    return builder.toString();
-  }
-
-  private List<Partition> getPartitionsList() throws HiveException {
-    if (work.getTableSpecs() != null) {
-      TableSpec tblSpec = work.getTableSpecs();
-      table = tblSpec.tableHandle;
-      if (!table.isPartitioned()) {
-        return null;
-      } else {
-        return tblSpec.partitions;
-      }
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 682b42c..567126e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -16,501 +16,143 @@
  * limitations under the License.
  */
 
-
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.stats.StatsAggregator;
-import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
-import org.apache.hadoop.hive.ql.stats.StatsFactory;
-import org.apache.hadoop.hive.ql.stats.StatsPublisher;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.ql.stats.BasicStatsNoJobTask;
+import org.apache.hadoop.hive.ql.stats.BasicStatsTask;
+import org.apache.hadoop.hive.ql.stats.ColStatsProcessor;
+import org.apache.hadoop.hive.ql.stats.IStatsProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
- * StatsTask implementation. StatsTask mainly deals with "collectable" stats. These are
- * stats that require data scanning and are collected during query execution (unless the user
- * explicitly requests data scanning just for the purpose of stats computation using the "ANALYZE"
- * command. All other stats are computed directly by the MetaStore. The rationale being that the
- * MetaStore layer covers all Thrift calls and provides better guarantees about the accuracy of
- * those stats.
+ * StatsTask implementation.
  **/
-public class StatsTask extends Task<StatsWork> implements Serializable {
 
+public class StatsTask extends Task<StatsWork> implements Serializable {
   private static final long serialVersionUID = 1L;
   private static transient final Logger LOG = LoggerFactory.getLogger(StatsTask.class);
 
-  private Table table;
-  private Collection<Partition> dpPartSpecs;
-
   public StatsTask() {
     super();
-    dpPartSpecs = null;
   }
 
-  @Override
-  protected void receiveFeed(FeedType feedType, Object feedValue) {
-    // this method should be called by MoveTask when there are dynamic partitions generated
-    if (feedType == FeedType.DYNAMIC_PARTITIONS) {
-      dpPartSpecs = (Collection<Partition>) feedValue;
-    }
-  }
+  List<IStatsProcessor> processors = new ArrayList<>();
 
   @Override
-  public int execute(DriverContext driverContext) {
-    if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) {
-      return 0;
-    }
-    LOG.info("Executing stats task");
-    // Make sure that it is either an ANALYZE, INSERT OVERWRITE (maybe load) or CTAS command
-    short workComponentsPresent = 0;
-    if (work.getLoadTableDesc() != null) {
-      workComponentsPresent++;
-    }
-    if (work.getTableSpecs() != null) {
-      workComponentsPresent++;
+  public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx,
+      CompilationOpContext opContext) {
+    super.initialize(queryState, queryPlan, ctx, opContext);
+
+    if (work.getBasicStatsWork() != null) {
+      BasicStatsTask task = new BasicStatsTask(conf, work.getBasicStatsWork());
+      task.followedColStats = work.hasColStats();
+      processors.add(0, task);
+    } else if (work.isFooterScan()) {
+      BasicStatsNoJobTask t = new BasicStatsNoJobTask(conf, work.getBasicStatsNoJobWork());
+      processors.add(0, t);
     }
-    if (work.getLoadFileDesc() != null) {
-      workComponentsPresent++;
+    if (work.hasColStats()) {
+      processors.add(new ColStatsProcessor(work.getColStats(), conf));
     }
 
-    assert (workComponentsPresent == 1);
-
-    String tableName = "";
-    Hive hive = getHive();
-    try {
-      if (work.getLoadTableDesc() != null) {
-        tableName = work.getLoadTableDesc().getTable().getTableName();
-      } else if (work.getTableSpecs() != null){
-        tableName = work.getTableSpecs().tableName;
-      } else {
-        tableName = work.getLoadFileDesc().getDestinationCreateTable();
-      }
-
-      table = hive.getTable(tableName);
-
-    } catch (HiveException e) {
-      LOG.error("Cannot get table {}", tableName, e);
-      console.printError("Cannot get table " + tableName, e.toString());
+    for (IStatsProcessor p : processors) {
+      p.initialize(opContext);
     }
-
-    return aggregateStats(hive);
-
   }
 
-  @Override
-  public StageType getType() {
-    return StageType.STATS;
-  }
 
   @Override
-  public String getName() {
-    return "STATS";
-  }
-
-  private int aggregateStats(Hive db) {
-
-    StatsAggregator statsAggregator = null;
+  public int execute(DriverContext driverContext) {
+    if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) {
+      return 0;
+    }
+    if (work.isAggregating() && work.isFooterScan()) {
+      throw new RuntimeException("Can not have both basic stats work and stats no job work!");
+    }
     int ret = 0;
-    StatsCollectionContext scc = null;
-    EnvironmentContext environmentContext = null;
     try {
-      // Stats setup:
-      final Warehouse wh = new Warehouse(conf);
-      if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {
-        try {
-          scc = getContext();
-          statsAggregator = createStatsAggregator(scc, conf);
-        } catch (HiveException e) {
-          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
-            throw e;
-          }
-          console.printError(ErrorMsg.STATS_SKIPPING_BY_ERROR.getErrorCodedMsg(e.toString()));
-        }
-      }
-
-      List<Partition> partitions = getPartitionsList(db);
-      boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
-
-      String tableFullName = table.getFullyQualifiedName();
-
-      if (partitions == null) {
-        org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
-        Map<String, String> parameters = tTable.getParameters();
-        // In the following scenarios, we need to reset the stats to true.
-        // work.getTableSpecs() != null means analyze command
-        // work.getLoadTableDesc().getReplace() is true means insert overwrite command 
-        // work.getLoadFileDesc().getDestinationCreateTable().isEmpty() means CTAS etc.
-        // acidTable will not have accurate stats unless it is set through analyze command.
-        if (work.getTableSpecs() == null && AcidUtils.isFullAcidTable(table)) {
-          StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
-        } else if (work.getTableSpecs() != null
-            || (work.getLoadTableDesc() != null
-                && (work.getLoadTableDesc().getLoadFileType() == LoadFileType.REPLACE_ALL))
-            || (work.getLoadFileDesc() != null && !work.getLoadFileDesc()
-                .getDestinationCreateTable().isEmpty())) {
-          StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
-        }
-        // non-partitioned tables:
-        if (!existStats(parameters) && atomic) {
-          return 0;
-        }
-
-        // The collectable stats for the aggregator needs to be cleared.
-        // For eg. if a file is being loaded, the old number of rows are not valid
-        if (work.isClearAggregatorStats()) {
-          // we choose to keep the invalid stats and only change the setting.
-          StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
-        }
-
-        updateQuickStats(wh, parameters, tTable.getSd());
-        if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
-          if (statsAggregator != null) {
-            String prefix = getAggregationPrefix(table, null);
-            updateStats(statsAggregator, parameters, prefix, atomic);
-          }
-          // write table stats to metastore
-          if (!getWork().getNoStatsAggregator()) {
-            environmentContext = new EnvironmentContext();
-            environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED,
-                StatsSetupConst.TASK);
-          }
-        }
-
-        getHive().alterTable(table, environmentContext);
-        if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
-          console.printInfo("Table " + tableFullName + " stats: [" + toString(parameters) + ']');
-        }
-        LOG.info("Table {} stats: [{}]", tableFullName, toString(parameters));
-        if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-          Utilities.FILE_OP_LOGGER.trace(
-              "Table " + tableFullName + " stats: [" + toString(parameters) + ']');
-        }
-      } else {
-        // Partitioned table:
-        // Need to get the old stats of the partition
-        // and update the table stats based on the old and new stats.
-        List<Partition> updates = new ArrayList<Partition>();
-
-        //Get the file status up-front for all partitions. Beneficial in cases of blob storage systems
-        final Map<String, FileStatus[]> fileStatusMap = new ConcurrentHashMap<String, FileStatus[]>();
-        int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 1);
-        // In case thread count is set to 0, use single thread.
-        poolSize = Math.max(poolSize, 1);
-        final ExecutorService pool = Executors.newFixedThreadPool(poolSize,
-          new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("stats-updater-thread-%d")
-            .build());
-        final List<Future<Void>> futures = Lists.newLinkedList();
-        LOG.debug("Getting file stats of all partitions. threadpool size: {}", poolSize);
-        try {
-          for(final Partition partn : partitions) {
-            final String partitionName = partn.getName();
-            final org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
-            Map<String, String> parameters = tPart.getParameters();
-
-            if (!existStats(parameters) && atomic) {
-              continue;
-            }
-            futures.add(pool.submit(new Callable<Void>() {
-              @Override
-              public Void call() throws Exception {
-                FileStatus[] partfileStatus = wh.getFileStatusesForSD(tPart.getSd());
-                fileStatusMap.put(partitionName,  partfileStatus);
-                return null;
-              }
-            }));
-          }
-          pool.shutdown();
-          for(Future<Void> future : futures) {
-            future.get();
-          }
-        } catch (InterruptedException e) {
-          LOG.debug("Cancelling {} file stats lookup tasks", futures.size());
-          //cancel other futures
-          for (Future future : futures) {
-            future.cancel(true);
-          }
-          // Fail the query if the stats are supposed to be reliable
-          if (work.isStatsReliable()) {
-            ret = 1;
-          }
-        } finally {
-          if (pool != null) {
-            pool.shutdownNow();
-          }
-          LOG.debug("Finished getting file stats of all partitions");
-        }
 
-        for (Partition partn : partitions) {
-          //
-          // get the old partition stats
-          //
-          org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
-          Map<String, String> parameters = tPart.getParameters();
-          if (work.getTableSpecs() == null && AcidUtils.isFullAcidTable(table)) {
-            StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
-          } else if (work.getTableSpecs() != null
-              || (work.getLoadTableDesc() != null
-                  && (work.getLoadTableDesc().getLoadFileType() == LoadFileType.REPLACE_ALL))
-              || (work.getLoadFileDesc() != null && !work.getLoadFileDesc()
-                  .getDestinationCreateTable().isEmpty())) {
-            StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
-          }
-          //only when the stats exist, it is added to fileStatusMap
-          if (!fileStatusMap.containsKey(partn.getName())) {
-            continue;
-          }
-
-          // The collectable stats for the aggregator needs to be cleared.
-          // For eg. if a file is being loaded, the old number of rows are not valid
-          if (work.isClearAggregatorStats()) {
-            // we choose to keep the invalid stats and only change the setting.
-            StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
-          }
+      if (work.isFooterScan()) {
+        work.getBasicStatsNoJobWork().setPartitions(work.getPartitions());
+      }
 
-          updateQuickStats(parameters, fileStatusMap.get(partn.getName()));
-          if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
-            if (statsAggregator != null) {
-              String prefix = getAggregationPrefix(table, partn);
-              updateStats(statsAggregator, parameters, prefix, atomic);
-            }
-            if (!getWork().getNoStatsAggregator()) {
-              environmentContext = new EnvironmentContext();
-              environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED,
-                  StatsSetupConst.TASK);
-            }
-          }
-          updates.add(new Partition(table, tPart));
+      Hive db = getHive();
+      Table tbl = getTable(db);
 
-          if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
-            console.printInfo("Partition " + tableFullName + partn.getSpec() +
-            " stats: [" + toString(parameters) + ']');
-          }
-          LOG.info("Partition {}{} stats: [{}]", tableFullName, partn.getSpec(),
-            toString(parameters));
-        }
-        if (!updates.isEmpty()) {
-          db.alterPartitions(tableFullName, updates, environmentContext);
+      for (IStatsProcessor task : processors) {
+        task.setDpPartSpecs(dpPartSpecs);
+        ret = task.process(db, tbl);
+        if (ret != 0) {
+          return ret;
         }
       }
-
     } catch (Exception e) {
-      console.printInfo("[Warning] could not update stats.",
-          "Failed with exception " + e.getMessage() + "\n"
-              + StringUtils.stringifyException(e));
-
-      // Fail the query if the stats are supposed to be reliable
-      if (work.isStatsReliable()) {
-        ret = 1;
-      }
-    } finally {
-      if (statsAggregator != null) {
-        statsAggregator.closeConnection(scc);
-      }
+      LOG.error("Failed to run stats task", e);
+      return 1;
     }
-    // The return value of 0 indicates success,
-    // anything else indicates failure
-    return ret;
+    return 0;
   }
 
-  private String getAggregationPrefix(Table table, Partition partition)
-      throws MetaException {
 
-    // prefix is of the form dbName.tblName
-    String prefix = StatsUtils.getFullyQualifiedTableName(table.getDbName(),
-        org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.encodeTableName(table.getTableName()));
-    if (partition != null) {
-      return Utilities.join(prefix, Warehouse.makePartPath(partition.getSpec()));
+  private Table getTable(Hive db) throws SemanticException, HiveException {
+    Table tbl = work.getTable();
+    // FIXME for ctas this is still needed because location is not set sometimes
+    if (tbl.getSd().getLocation() == null) {
+      tbl = db.getTable(work.getFullTableName());
     }
-    return prefix;
+    return tbl;
   }
 
-  private StatsAggregator createStatsAggregator(StatsCollectionContext scc, HiveConf conf) throws HiveException {
-    String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
-    StatsFactory factory = StatsFactory.newFactory(statsImpl, conf);
-    if (factory == null) {
-      throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg());
-    }
-    // initialize stats publishing table for noscan which has only stats task
-    // the rest of MR task following stats task initializes it in ExecDriver.java
-    StatsPublisher statsPublisher = factory.getStatsPublisher();
-    if (!statsPublisher.init(scc)) { // creating stats table if not exists
-      throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
-    }
-
-    // manufacture a StatsAggregator
-    StatsAggregator statsAggregator = factory.getStatsAggregator();
-    if (!statsAggregator.connect(scc)) {
-      throw new HiveException(ErrorMsg.STATSAGGREGATOR_CONNECTION_ERROR.getErrorCodedMsg(statsImpl));
-    }
-    return statsAggregator;
-  }
-
-  private StatsCollectionContext getContext() throws HiveException {
-
-    StatsCollectionContext scc = new StatsCollectionContext(conf);
-    Task sourceTask = getWork().getSourceTask();
-    if (sourceTask == null) {
-      throw new HiveException(ErrorMsg.STATSAGGREGATOR_SOURCETASK_NULL.getErrorCodedMsg());
-    }
-    scc.setTask(sourceTask);
-    scc.setStatsTmpDir(this.getWork().getStatsTmpDir());
-    return scc;
-  }
-
-  private boolean existStats(Map<String, String> parameters) {
-    return parameters.containsKey(StatsSetupConst.ROW_COUNT)
-        || parameters.containsKey(StatsSetupConst.NUM_FILES)
-        || parameters.containsKey(StatsSetupConst.TOTAL_SIZE)
-        || parameters.containsKey(StatsSetupConst.RAW_DATA_SIZE)
-        || parameters.containsKey(StatsSetupConst.NUM_PARTITIONS);
-  }
-
-  private void updateStats(StatsAggregator statsAggregator,
-      Map<String, String> parameters, String prefix, boolean atomic)
-      throws HiveException {
-
-    String aggKey = prefix.endsWith(Path.SEPARATOR) ? prefix : prefix + Path.SEPARATOR;
-
-    for (String statType : StatsSetupConst.statsRequireCompute) {
-      String value = statsAggregator.aggregateStats(aggKey, statType);
-      if (value != null && !value.isEmpty()) {
-        long longValue = Long.parseLong(value);
-
-        if (work.getLoadTableDesc() != null &&
-                (work.getLoadTableDesc().getLoadFileType() != LoadFileType.REPLACE_ALL)) {
-          String originalValue = parameters.get(statType);
-          if (originalValue != null) {
-            longValue += Long.parseLong(originalValue); // todo: invalid + valid = invalid
-          }
-        }
-        parameters.put(statType, String.valueOf(longValue));
-      } else {
-        if (atomic) {
-          throw new HiveException(ErrorMsg.STATSAGGREGATOR_MISSED_SOMESTATS, statType);
-        }
-      }
-    }
+  @Override
+  public StageType getType() {
+    return StageType.STATS;
   }
 
-  private void updateQuickStats(Warehouse wh, Map<String, String> parameters,
-      StorageDescriptor desc) throws MetaException {
-    /**
-     * calculate fast statistics
-     */
-    FileStatus[] partfileStatus = wh.getFileStatusesForSD(desc);
-    updateQuickStats(parameters, partfileStatus);
+  @Override
+  public String getName() {
+    return "STATS TASK";
   }
 
-  private void updateQuickStats(Map<String, String> parameters,
-      FileStatus[] partfileStatus) throws MetaException {
-    MetaStoreUtils.populateQuickStats(partfileStatus, parameters);
-  }
+  private Collection<Partition> dpPartSpecs;
 
-  private String toString(Map<String, String> parameters) {
-    StringBuilder builder = new StringBuilder();
-    for (String statType : StatsSetupConst.supportedStats) {
-      String value = parameters.get(statType);
-      if (value != null) {
-        if (builder.length() > 0) {
-          builder.append(", ");
-        }
-        builder.append(statType).append('=').append(value);
-      }
+  @Override
+  protected void receiveFeed(FeedType feedType, Object feedValue) {
+    // this method should be called by MoveTask when there are dynamic
+    // partitions generated
+    if (feedType == FeedType.DYNAMIC_PARTITIONS) {
+      dpPartSpecs = (Collection<Partition>) feedValue;
     }
-    return builder.toString();
   }
 
-  /**
-   * Get the list of partitions that need to update statistics.
-   * TODO: we should reuse the Partitions generated at compile time
-   * since getting the list of partitions is quite expensive.
-   *
-   * @return a list of partitions that need to update statistics.
-   * @throws HiveException
-   */
-  private List<Partition> getPartitionsList(Hive db) throws HiveException {
-    if (work.getLoadFileDesc() != null) {
-      return null; //we are in CTAS, so we know there are no partitions
-    }
-
-    List<Partition> list = new ArrayList<Partition>();
+  public static ExecutorService newThreadPool(HiveConf conf) {
+    int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_STATS_GATHER_NUM_THREADS);
 
-    if (work.getTableSpecs() != null) {
-
-      // ANALYZE command
-      TableSpec tblSpec = work.getTableSpecs();
-      table = tblSpec.tableHandle;
-      if (!table.isPartitioned()) {
-        return null;
-      }
-      // get all partitions that matches with the partition spec
-      List<Partition> partitions = tblSpec.partitions;
-      if (partitions != null) {
-        for (Partition partn : partitions) {
-          list.add(partn);
-        }
-      }
-    } else if (work.getLoadTableDesc() != null) {
-
-      // INSERT OVERWRITE command
-      LoadTableDesc tbd = work.getLoadTableDesc();
-      table = db.getTable(tbd.getTable().getTableName());
-      if (!table.isPartitioned()) {
-        return null;
-      }
-      DynamicPartitionCtx dpCtx = tbd.getDPCtx();
-      if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
-        // If no dynamic partitions are generated, dpPartSpecs may not be initialized
-        if (dpPartSpecs != null) {
-          // load the list of DP partitions and return the list of partition specs
-          list.addAll(dpPartSpecs);
-        }
-      } else { // static partition
-        Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
-        list.add(partn);
-      }
-    }
-    return list;
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("StatsNoJobTask-Thread-%d").build());
+    LOG.info("Initialized threadpool for stats computation with {} threads", numThreads);
+    return executor;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index ab495cf..75603ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -349,6 +349,7 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
     final List<Task<? extends Serializable>> leafTasks = new ArrayList<Task<?>>();
 
     NodeUtils.iterateTask(rootTasks, Task.class, new NodeUtils.Function<Task>() {
+      @Override
       public void apply(Task task) {
         List dependents = task.getDependentTasks();
         if (dependents == null || dependents.isEmpty()) {
@@ -648,4 +649,5 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
   public boolean canExecuteInParallel(){
     return true;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 36a5eff..e22dc25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.CopyWork;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -53,8 +53,6 @@ import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 
 /**
@@ -98,10 +96,7 @@ public final class TaskFactory {
 
     taskvec.add(new TaskTuple<MapredLocalWork>(MapredLocalWork.class,
         MapredLocalTask.class));
-    taskvec.add(new TaskTuple<StatsWork>(StatsWork.class,
-        StatsTask.class));
-    taskvec.add(new TaskTuple<StatsNoJobWork>(StatsNoJobWork.class, StatsNoJobTask.class));
-    taskvec.add(new TaskTuple<ColumnStatsWork>(ColumnStatsWork.class, ColumnStatsTask.class));
+    taskvec.add(new TaskTuple<StatsWork>(StatsWork.class, StatsTask.class));
     taskvec.add(new TaskTuple<ColumnStatsUpdateWork>(ColumnStatsUpdateWork.class, ColumnStatsUpdateTask.class));
     taskvec.add(new TaskTuple<MergeFileWork>(MergeFileWork.class,
         MergeFileTask.class));

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 71fa42c..00590e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -153,6 +153,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.IStatsGatherDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
@@ -161,7 +162,6 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.api.Adjacency;
 import org.apache.hadoop.hive.ql.plan.api.Graph;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -1620,12 +1620,14 @@ public final class Utilities {
     return removeTempOrDuplicateFiles(
         fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept);
   }
-  
+
   private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException {
     FileStatus[] items = fs.listStatus(path);
     // remove empty directory since DP insert should not generate empty partitions.
     // empty directories could be generated by crashed Task/ScriptOperator
-    if (items.length != 0) return false;
+    if (items.length != 0) {
+      return false;
+    }
     if (!fs.delete(path, true)) {
       LOG.error("Cannot delete empty directory {}", path);
       throw new IOException("Cannot delete empty directory " + path);
@@ -3607,7 +3609,9 @@ public final class Utilities {
 
       if (op instanceof FileSinkOperator) {
         FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
-        if (fdesc.isMmTable()) continue; // No need to create for MM tables
+        if (fdesc.isMmTable()) {
+          continue; // No need to create for MM tables
+        }
         Path tempDir = fdesc.getDirName();
         if (tempDir != null) {
           Path tempPath = Utilities.toTempPath(tempDir);
@@ -3923,10 +3927,8 @@ public final class Utilities {
     for (Operator<? extends OperatorDesc> op : ops) {
       OperatorDesc desc = op.getConf();
       String statsTmpDir = null;
-      if (desc instanceof FileSinkDesc) {
-         statsTmpDir = ((FileSinkDesc)desc).getStatsTmpDir();
-      } else if (desc instanceof TableScanDesc) {
-        statsTmpDir = ((TableScanDesc) desc).getTmpStatsDir();
+      if (desc instanceof IStatsGatherDesc) {
+        statsTmpDir = ((IStatsGatherDesc) desc).getTmpStatsDir();
       }
       if (statsTmpDir != null && !statsTmpDir.isEmpty()) {
         statsTmpDirs.add(statsTmpDir);
@@ -4078,7 +4080,9 @@ public final class Utilities {
   }
 
   private static Path[] statusToPath(FileStatus[] statuses) {
-    if (statuses == null) return null;
+    if (statuses == null) {
+      return null;
+    }
     Path[] paths = new Path[statuses.length];
     for (int i = 0; i < statuses.length; ++i) {
       paths[i] = statuses[i].getPath();
@@ -4108,7 +4112,9 @@ public final class Utilities {
       Utilities.FILE_OP_LOGGER.trace("Looking at {} from {}", subDir, lfsPath);
 
       // If sorted, we'll skip a bunch of files.
-      if (lastRelDir != null && subDir.startsWith(lastRelDir)) continue;
+      if (lastRelDir != null && subDir.startsWith(lastRelDir)) {
+        continue;
+      }
       int startIx = skipLevels > 0 ? -1 : 0;
       for (int i = 0; i < skipLevels; ++i) {
         startIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1);
@@ -4118,7 +4124,9 @@ public final class Utilities {
           break;
         }
       }
-      if (startIx == -1) continue;
+      if (startIx == -1) {
+        continue;
+      }
       int endIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1);
       if (endIx == -1) {
         Utilities.FILE_OP_LOGGER.info("Expected level of nesting ({}) is not present in"
@@ -4127,7 +4135,9 @@ public final class Utilities {
       }
       lastRelDir = subDir = subDir.substring(0, endIx);
       Path candidate = new Path(relRoot, subDir);
-      if (!filter.accept(candidate)) continue;
+      if (!filter.accept(candidate)) {
+        continue;
+      }
       results.add(fs.makeQualified(candidate));
     }
     return results.toArray(new Path[results.size()]);
@@ -4168,7 +4178,7 @@ public final class Utilities {
 
   public static void writeMmCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs,
       String taskId, Long txnId, int stmtId, String unionSuffix) throws HiveException {
-    if (CollectionUtils.isEmpty(commitPaths)) {
+    if (commitPaths.isEmpty()) {
       return;
     }
     // We assume one FSOP per task (per specPath), so we create it in specPath.
@@ -4288,11 +4298,15 @@ public final class Utilities {
       throw new HiveException("The following files were committed but not found: " + committed);
     }
 
-    if (mmDirectories.isEmpty()) return;
+    if (mmDirectories.isEmpty()) {
+      return;
+    }
 
     // TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list bucketing,
     //       so maintain parity here by not calling it at all.
-    if (lbLevels != 0) return;
+    if (lbLevels != 0) {
+      return;
+    }
     // Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles
     // doesn't need tocheck anything except path and directory status for MM directories.
     FileStatus[] finalResults = new FileStatus[mmDirectories.size()];
@@ -4320,7 +4334,9 @@ public final class Utilities {
     for (FileStatus child : fs.listStatus(dir)) {
       Path childPath = child.getPath();
       if (unionSuffix == null) {
-        if (committed.remove(childPath.toString())) continue; // A good file.
+        if (committed.remove(childPath.toString())) {
+          continue; // A good file.
+        }
         deleteUncommitedFile(childPath, fs);
       } else if (!child.isDirectory()) {
         if (committed.contains(childPath.toString())) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index cf4df9b..cceea01 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1794,7 +1794,9 @@ public class Hive {
       }
 
       // column stats will be inaccurate
-      StatsSetupConst.clearColumnStatsState(newTPart.getParameters());
+      if (!hasFollowingStatsTask) {
+        StatsSetupConst.clearColumnStatsState(newTPart.getParameters());
+      }
 
       // recreate the partition if it existed before
       if (isSkewedStoreAsSubdir) {
@@ -1813,8 +1815,8 @@ public class Hive {
       if (oldPart == null) {
         newTPart.getTPartition().setParameters(new HashMap<String,String>());
         if (this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
-          StatsSetupConst.setStatsStateForCreateTable(newTPart.getParameters(), null,
-              StatsSetupConst.TRUE);
+          StatsSetupConst.setStatsStateForCreateTable(newTPart.getParameters(),
+              MetaStoreUtils.getColumnNames(tbl.getCols()), StatsSetupConst.TRUE);
         }
         MetaStoreUtils.populateQuickStats(HiveStatsUtils.getFileStatusRecurse(newPartPath, -1, newPartPath.getFileSystem(conf)), newTPart.getParameters());
         try {
@@ -2299,7 +2301,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
 
     //column stats will be inaccurate
-    StatsSetupConst.clearColumnStatsState(tbl.getParameters());
+    if (!hasFollowingStatsTask) {
+      StatsSetupConst.clearColumnStatsState(tbl.getParameters());
+    }
 
     try {
       if (isSkewedStoreAsSubdir) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
index 78e83af..1c26200 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -196,7 +197,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
     Collections.sort(tableNames);
     return tableNames;
   }
-  
+
   @Override
   public List<TableMeta> getTableMeta(String dbPatterns, String tablePatterns, List<String> tableTypes)
       throws MetaException {
@@ -235,7 +236,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
     }
     return tableMetas;
   }
-  
+
   private boolean matchesAny(String string, List<Matcher> matchers) {
     for (Matcher matcher : matchers) {
       if (matcher.reset(string).matches()) {
@@ -399,6 +400,8 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
       EnvironmentContext envContext) throws AlreadyExistsException, InvalidObjectException,
       MetaException, NoSuchObjectException, TException {
 
+    boolean isVirtualTable = tbl.getTableName().startsWith(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX);
+
     SessionState ss = SessionState.get();
     if (ss == null) {
       throw new MetaException("No current SessionState, cannot create temporary table"
@@ -434,6 +437,10 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
 
     // Add temp table info to current session
     Table tTable = new Table(tbl);
+    if (!isVirtualTable) {
+      StatsSetupConst.setStatsStateForCreateTable(tbl.getParameters(),
+          org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getColumnNamesForTable(tbl), StatsSetupConst.TRUE);
+    }
     if (tables == null) {
       tables = new HashMap<String, Table>();
       ss.getTempTables().put(dbName, tables);
@@ -466,8 +473,6 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
     }
 
     org.apache.hadoop.hive.metastore.api.Table newtCopy = deepCopyAndLowerCaseTable(newt);
-    MetaStoreUtils.updateTableStatsFast(newtCopy,
-        getWh().getFileStatusesForSD(newtCopy.getSd()), false, true, envContext);
     Table newTable = new Table(newtCopy);
     String newDbName = newTable.getDbName();
     String newTableName = newTable.getTableName();
@@ -656,7 +661,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
   public static Map<String, Table> getTempTablesForDatabase(String dbName) {
     return getTempTables().get(dbName);
   }
-  
+
   public static Map<String, Map<String, Table>> getTempTables() {
     SessionState ss = SessionState.get();
     if (ss == null) {
@@ -712,6 +717,13 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
           ssTableColStats);
     }
     mergeColumnStats(ssTableColStats, colStats);
+
+    List<String> colNames = new ArrayList<>();
+    for (ColumnStatisticsObj obj : colStats.getStatsObj()) {
+      colNames.add(obj.getColName());
+    }
+    org.apache.hadoop.hive.metastore.api.Table table = getTempTable(dbName, tableName);
+    StatsSetupConst.setColumnStatsState(table.getParameters(), colNames);
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
index 768640c..4fb39fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
@@ -23,12 +23,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -41,10 +39,10 @@ import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.hive.ql.plan.BasicStatsWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.mapred.InputFormat;
 
 /**
@@ -67,8 +65,8 @@ public class GenMRTableScan1 implements NodeProcessor {
     TableScanOperator op = (TableScanOperator) nd;
     GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
     ParseContext parseCtx = ctx.getParseCtx();
-    Class<? extends InputFormat> inputFormat = op.getConf().getTableMetadata()
-        .getInputFormatClass();
+    Table table = op.getConf().getTableMetadata();
+    Class<? extends InputFormat> inputFormat = table.getInputFormatClass();
     Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
 
     // create a dummy MapReduce task
@@ -93,19 +91,17 @@ public class GenMRTableScan1 implements NodeProcessor {
             // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
 
             // There will not be any MR or Tez job above this task
-            StatsNoJobWork snjWork = new StatsNoJobWork(op.getConf().getTableMetadata().getTableSpec());
-            snjWork.setStatsReliable(parseCtx.getConf().getBoolVar(
-                HiveConf.ConfVars.HIVE_STATS_RELIABLE));
+            StatsWork statWork = new StatsWork(table, parseCtx.getConf());
+            statWork.setFooterScan();
+
             // If partition is specified, get pruned partition list
             Set<Partition> confirmedParts = GenMapRedUtils.getConfirmedPartitionsForScan(op);
             if (confirmedParts.size() > 0) {
-              Table source = op.getConf().getTableMetadata();
               List<String> partCols = GenMapRedUtils.getPartitionColumns(op);
-              PrunedPartitionList partList = new PrunedPartitionList(source, confirmedParts,
-                  partCols, false);
-              snjWork.setPrunedPartitionList(partList);
+              PrunedPartitionList partList = new PrunedPartitionList(table, confirmedParts, partCols, false);
+              statWork.addInputPartitions(partList.getPartitions());
             }
-            Task<StatsNoJobWork> snjTask = TaskFactory.get(snjWork, parseCtx.getConf());
+            Task<StatsWork> snjTask = TaskFactory.get(statWork, parseCtx.getConf());
             ctx.setCurrTask(snjTask);
             ctx.setCurrTopOp(null);
             ctx.getRootTasks().clear();
@@ -115,14 +111,15 @@ public class GenMRTableScan1 implements NodeProcessor {
             // The plan consists of a simple MapRedTask followed by a StatsTask.
             // The MR task is just a simple TableScanOperator
 
-            StatsWork statsWork = new StatsWork(op.getConf().getTableMetadata().getTableSpec());
-            statsWork.setAggKey(op.getConf().getStatsAggPrefix());
-            statsWork.setStatsTmpDir(op.getConf().getTmpStatsDir());
-            statsWork.setSourceTask(currTask);
-            statsWork.setStatsReliable(parseCtx.getConf().getBoolVar(
-                HiveConf.ConfVars.HIVE_STATS_RELIABLE));
-            Task<StatsWork> statsTask = TaskFactory.get(statsWork, parseCtx.getConf());
-            currTask.addDependentTask(statsTask);
+            BasicStatsWork statsWork = new BasicStatsWork(table.getTableSpec());
+
+            statsWork.setNoScanAnalyzeCommand(noScan);
+            StatsWork columnStatsWork = new StatsWork(table, statsWork, parseCtx.getConf());
+            columnStatsWork.collectStatsFromAggregator(op.getConf());
+
+            columnStatsWork.setSourceTask(currTask);
+            Task<StatsWork> columnStatsTask = TaskFactory.get(columnStatsWork, parseCtx.getConf());
+            currTask.addDependentTask(columnStatsTask);
             if (!ctx.getRootTasks().contains(currTask)) {
               ctx.getRootTasks().add(currTask);
             }
@@ -130,10 +127,9 @@ public class GenMRTableScan1 implements NodeProcessor {
             // ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
             // The plan consists of a StatsTask only.
             if (noScan) {
-              statsTask.setParentTasks(null);
-              statsWork.setNoScanAnalyzeCommand(true);
+              columnStatsTask.setParentTasks(null);
               ctx.getRootTasks().remove(currTask);
-              ctx.getRootTasks().add(statsTask);
+              ctx.getRootTasks().add(columnStatsTask);
             }
 
             currWork.getMapWork().setGatheringStats(true);
@@ -147,9 +143,8 @@ public class GenMRTableScan1 implements NodeProcessor {
             Set<Partition> confirmedPartns = GenMapRedUtils
                 .getConfirmedPartitionsForScan(op);
             if (confirmedPartns.size() > 0) {
-              Table source = op.getConf().getTableMetadata();
               List<String> partCols = GenMapRedUtils.getPartitionColumns(op);
-              PrunedPartitionList partList = new PrunedPartitionList(source, confirmedPartns, partCols, false);
+              PrunedPartitionList partList = new PrunedPartitionList(table, confirmedPartns, partCols, false);
               GenMapRedUtils.setTaskPlan(currAliasId, op, currTask, false, ctx, partList);
             } else { // non-partitioned table
               GenMapRedUtils.setTaskPlan(currAliasId, op, currTask, false, ctx);