You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/11/07 22:33:58 UTC
[21/22] hive git commit: HIVE-16827 : Merge stats task and column
stats task into a single task (Zoltan Haindrich via Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
deleted file mode 100644
index 1f28688..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.Date;
-import org.apache.hadoop.hive.metastore.api.Decimal;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
-import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector;
-import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector;
-import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector;
-import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
-import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
-import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ColumnStatsTask implementation.
- **/
-
-public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializable {
- private static final long serialVersionUID = 1L;
- private FetchOperator ftOp;
- private static transient final Logger LOG = LoggerFactory.getLogger(ColumnStatsTask.class);
-
- public ColumnStatsTask() {
- super();
- }
-
- @Override
- public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx,
- CompilationOpContext opContext) {
- super.initialize(queryState, queryPlan, ctx, opContext);
- work.initializeForFetch(opContext);
- try {
- JobConf job = new JobConf(conf);
- ftOp = new FetchOperator(work.getfWork(), job);
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- throw new RuntimeException(e);
- }
- }
-
- private void unpackBooleanStats(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj statsObj) {
- long v = ((LongObjectInspector) oi).get(o);
- if (fName.equals("counttrues")) {
- statsObj.getStatsData().getBooleanStats().setNumTrues(v);
- } else if (fName.equals("countfalses")) {
- statsObj.getStatsData().getBooleanStats().setNumFalses(v);
- } else if (fName.equals("countnulls")) {
- statsObj.getStatsData().getBooleanStats().setNumNulls(v);
- }
- }
-
- @SuppressWarnings("serial")
- class UnsupportedDoubleException extends Exception {
- }
-
- private void unpackDoubleStats(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
- if (fName.equals("countnulls")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getDoubleStats().setNumNulls(v);
- } else if (fName.equals("numdistinctvalues")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getDoubleStats().setNumDVs(v);
- } else if (fName.equals("max")) {
- double d = ((DoubleObjectInspector) oi).get(o);
- if (Double.isInfinite(d) || Double.isNaN(d)) {
- throw new UnsupportedDoubleException();
- }
- statsObj.getStatsData().getDoubleStats().setHighValue(d);
- } else if (fName.equals("min")) {
- double d = ((DoubleObjectInspector) oi).get(o);
- if (Double.isInfinite(d) || Double.isNaN(d)) {
- throw new UnsupportedDoubleException();
- }
- statsObj.getStatsData().getDoubleStats().setLowValue(d);
- } else if (fName.equals("ndvbitvector")) {
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
- byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
- statsObj.getStatsData().getDoubleStats().setBitVectors(buf);
- }
- }
-
- private void unpackDecimalStats(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj statsObj) {
- if (fName.equals("countnulls")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getDecimalStats().setNumNulls(v);
- } else if (fName.equals("numdistinctvalues")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getDecimalStats().setNumDVs(v);
- } else if (fName.equals("max")) {
- HiveDecimal d = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
- statsObj.getStatsData().getDecimalStats().setHighValue(convertToThriftDecimal(d));
- } else if (fName.equals("min")) {
- HiveDecimal d = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
- statsObj.getStatsData().getDecimalStats().setLowValue(convertToThriftDecimal(d));
- } else if (fName.equals("ndvbitvector")) {
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
- byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
- statsObj.getStatsData().getDecimalStats().setBitVectors(buf);
- }
- }
-
- private Decimal convertToThriftDecimal(HiveDecimal d) {
- return new Decimal(ByteBuffer.wrap(d.unscaledValue().toByteArray()), (short)d.scale());
- }
-
- private void unpackLongStats(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj statsObj) {
- if (fName.equals("countnulls")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getLongStats().setNumNulls(v);
- } else if (fName.equals("numdistinctvalues")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getLongStats().setNumDVs(v);
- } else if (fName.equals("max")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getLongStats().setHighValue(v);
- } else if (fName.equals("min")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getLongStats().setLowValue(v);
- } else if (fName.equals("ndvbitvector")) {
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
- byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
- statsObj.getStatsData().getLongStats().setBitVectors(buf);
- }
- }
-
- private void unpackStringStats(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj statsObj) {
- if (fName.equals("countnulls")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getStringStats().setNumNulls(v);
- } else if (fName.equals("numdistinctvalues")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getStringStats().setNumDVs(v);
- } else if (fName.equals("avglength")) {
- double d = ((DoubleObjectInspector) oi).get(o);
- statsObj.getStatsData().getStringStats().setAvgColLen(d);
- } else if (fName.equals("maxlength")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getStringStats().setMaxColLen(v);
- } else if (fName.equals("ndvbitvector")) {
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
- byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
- statsObj.getStatsData().getStringStats().setBitVectors(buf);
- }
- }
-
- private void unpackBinaryStats(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj statsObj) {
- if (fName.equals("countnulls")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getBinaryStats().setNumNulls(v);
- } else if (fName.equals("avglength")) {
- double d = ((DoubleObjectInspector) oi).get(o);
- statsObj.getStatsData().getBinaryStats().setAvgColLen(d);
- } else if (fName.equals("maxlength")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getBinaryStats().setMaxColLen(v);
- }
- }
-
- private void unpackDateStats(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj statsObj) {
- if (fName.equals("countnulls")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getDateStats().setNumNulls(v);
- } else if (fName.equals("numdistinctvalues")) {
- long v = ((LongObjectInspector) oi).get(o);
- statsObj.getStatsData().getDateStats().setNumDVs(v);
- } else if (fName.equals("max")) {
- DateWritable v = ((DateObjectInspector) oi).getPrimitiveWritableObject(o);
- statsObj.getStatsData().getDateStats().setHighValue(new Date(v.getDays()));
- } else if (fName.equals("min")) {
- DateWritable v = ((DateObjectInspector) oi).getPrimitiveWritableObject(o);
- statsObj.getStatsData().getDateStats().setLowValue(new Date(v.getDays()));
- } else if (fName.equals("ndvbitvector")) {
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
- byte[] buf = ((BinaryObjectInspector) poi).getPrimitiveJavaObject(o);
- statsObj.getStatsData().getDateStats().setBitVectors(buf);
- }
- }
-
- private void unpackPrimitiveObject (ObjectInspector oi, Object o, String fieldName,
- ColumnStatisticsObj statsObj) throws UnsupportedDoubleException {
- if (o == null) {
- return;
- }
- // First infer the type of object
- if (fieldName.equals("columntype")) {
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
- String s = ((StringObjectInspector) poi).getPrimitiveJavaObject(o);
- ColumnStatisticsData statsData = new ColumnStatisticsData();
-
- if (s.equalsIgnoreCase("long")) {
- LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector();
- statsData.setLongStats(longStats);
- statsObj.setStatsData(statsData);
- } else if (s.equalsIgnoreCase("double")) {
- DoubleColumnStatsDataInspector doubleStats = new DoubleColumnStatsDataInspector();
- statsData.setDoubleStats(doubleStats);
- statsObj.setStatsData(statsData);
- } else if (s.equalsIgnoreCase("string")) {
- StringColumnStatsDataInspector stringStats = new StringColumnStatsDataInspector();
- statsData.setStringStats(stringStats);
- statsObj.setStatsData(statsData);
- } else if (s.equalsIgnoreCase("boolean")) {
- BooleanColumnStatsData booleanStats = new BooleanColumnStatsData();
- statsData.setBooleanStats(booleanStats);
- statsObj.setStatsData(statsData);
- } else if (s.equalsIgnoreCase("binary")) {
- BinaryColumnStatsData binaryStats = new BinaryColumnStatsData();
- statsData.setBinaryStats(binaryStats);
- statsObj.setStatsData(statsData);
- } else if (s.equalsIgnoreCase("decimal")) {
- DecimalColumnStatsDataInspector decimalStats = new DecimalColumnStatsDataInspector();
- statsData.setDecimalStats(decimalStats);
- statsObj.setStatsData(statsData);
- } else if (s.equalsIgnoreCase("date")) {
- DateColumnStatsDataInspector dateStats = new DateColumnStatsDataInspector();
- statsData.setDateStats(dateStats);
- statsObj.setStatsData(statsData);
- }
- } else {
- // invoke the right unpack method depending on data type of the column
- if (statsObj.getStatsData().isSetBooleanStats()) {
- unpackBooleanStats(oi, o, fieldName, statsObj);
- } else if (statsObj.getStatsData().isSetLongStats()) {
- unpackLongStats(oi, o, fieldName, statsObj);
- } else if (statsObj.getStatsData().isSetDoubleStats()) {
- unpackDoubleStats(oi,o,fieldName, statsObj);
- } else if (statsObj.getStatsData().isSetStringStats()) {
- unpackStringStats(oi, o, fieldName, statsObj);
- } else if (statsObj.getStatsData().isSetBinaryStats()) {
- unpackBinaryStats(oi, o, fieldName, statsObj);
- } else if (statsObj.getStatsData().isSetDecimalStats()) {
- unpackDecimalStats(oi, o, fieldName, statsObj);
- } else if (statsObj.getStatsData().isSetDateStats()) {
- unpackDateStats(oi, o, fieldName, statsObj);
- }
- }
- }
-
- private void unpackStructObject(ObjectInspector oi, Object o, String fName,
- ColumnStatisticsObj cStatsObj) throws UnsupportedDoubleException {
- if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
- throw new RuntimeException("Invalid object datatype : " + oi.getCategory().toString());
- }
-
- StructObjectInspector soi = (StructObjectInspector) oi;
- List<? extends StructField> fields = soi.getAllStructFieldRefs();
- List<Object> list = soi.getStructFieldsDataAsList(o);
-
- for (int i = 0; i < fields.size(); i++) {
- // Get the field objectInspector, fieldName and the field object.
- ObjectInspector foi = fields.get(i).getFieldObjectInspector();
- Object f = (list == null ? null : list.get(i));
- String fieldName = fields.get(i).getFieldName();
-
- if (foi.getCategory() == ObjectInspector.Category.PRIMITIVE) {
- unpackPrimitiveObject(foi, f, fieldName, cStatsObj);
- } else {
- unpackStructObject(foi, f, fieldName, cStatsObj);
- }
- }
- }
-
- private List<ColumnStatistics> constructColumnStatsFromPackedRows(
- Hive db) throws HiveException, MetaException, IOException {
-
- String currentDb = work.getCurrentDatabaseName();
- String tableName = work.getColStats().getTableName();
- String partName = null;
- List<String> colName = work.getColStats().getColName();
- List<String> colType = work.getColStats().getColType();
- boolean isTblLevel = work.getColStats().isTblLevel();
-
- List<ColumnStatistics> stats = new ArrayList<ColumnStatistics>();
- InspectableObject packedRow;
- Table tbl = db.getTable(currentDb, tableName);
- while ((packedRow = ftOp.getNextRow()) != null) {
- if (packedRow.oi.getCategory() != ObjectInspector.Category.STRUCT) {
- throw new HiveException("Unexpected object type encountered while unpacking row");
- }
-
- List<ColumnStatisticsObj> statsObjs = new ArrayList<ColumnStatisticsObj>();
- StructObjectInspector soi = (StructObjectInspector) packedRow.oi;
- List<? extends StructField> fields = soi.getAllStructFieldRefs();
- List<Object> list = soi.getStructFieldsDataAsList(packedRow.o);
-
- List<FieldSchema> partColSchema = tbl.getPartCols();
- // Partition columns are appended at end, we only care about stats column
- int numOfStatCols = isTblLevel ? fields.size() : fields.size() - partColSchema.size();
- for (int i = 0; i < numOfStatCols; i++) {
- // Get the field objectInspector, fieldName and the field object.
- ObjectInspector foi = fields.get(i).getFieldObjectInspector();
- Object f = (list == null ? null : list.get(i));
- String fieldName = fields.get(i).getFieldName();
- ColumnStatisticsObj statsObj = new ColumnStatisticsObj();
- statsObj.setColName(colName.get(i));
- statsObj.setColType(colType.get(i));
- try {
- unpackStructObject(foi, f, fieldName, statsObj);
- statsObjs.add(statsObj);
- } catch (UnsupportedDoubleException e) {
- // due to infinity or nan.
- LOG.info("Because {} is infinite or NaN, we skip stats.", colName.get(i));
- }
- }
-
- if (!isTblLevel) {
- List<String> partVals = new ArrayList<String>();
- // Iterate over partition columns to figure out partition name
- for (int i = fields.size() - partColSchema.size(); i < fields.size(); i++) {
- Object partVal = ((PrimitiveObjectInspector)fields.get(i).getFieldObjectInspector()).
- getPrimitiveJavaObject(list.get(i));
- partVals.add(partVal == null ? // could be null for default partition
- this.conf.getVar(ConfVars.DEFAULTPARTITIONNAME) : partVal.toString());
- }
- partName = Warehouse.makePartName(partColSchema, partVals);
- }
- String [] names = Utilities.getDbTableName(currentDb, tableName);
- ColumnStatisticsDesc statsDesc = getColumnStatsDesc(names[0], names[1], partName, isTblLevel);
- ColumnStatistics colStats = new ColumnStatistics();
- colStats.setStatsDesc(statsDesc);
- colStats.setStatsObj(statsObjs);
- if (!statsObjs.isEmpty()) {
- stats.add(colStats);
- }
- }
- ftOp.clearFetchContext();
- return stats;
- }
-
- private ColumnStatisticsDesc getColumnStatsDesc(String dbName, String tableName,
- String partName, boolean isTblLevel)
- {
- ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
- statsDesc.setDbName(dbName);
- statsDesc.setTableName(tableName);
- statsDesc.setIsTblLevel(isTblLevel);
-
- if (!isTblLevel) {
- statsDesc.setPartName(partName);
- } else {
- statsDesc.setPartName(null);
- }
- return statsDesc;
- }
-
- private int persistColumnStats(Hive db) throws HiveException, MetaException, IOException {
- // Construct a column statistics object from the result
- List<ColumnStatistics> colStats = constructColumnStatsFromPackedRows(db);
- // Persist the column statistics object to the metastore
- // Note, this function is shared for both table and partition column stats.
- if (colStats.isEmpty()) {
- return 0;
- }
- SetPartitionsStatsRequest request = new SetPartitionsStatsRequest(colStats);
- if (work.getColStats() != null && work.getColStats().getNumBitVector() > 0) {
- request.setNeedMerge(true);
- }
- db.setPartitionColumnStatistics(request);
- return 0;
- }
-
- @Override
- public int execute(DriverContext driverContext) {
- if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) {
- return 0;
- }
- try {
- Hive db = getHive();
- return persistColumnStats(db);
- } catch (Exception e) {
- LOG.error("Failed to run column stats task", e);
- }
- return 1;
- }
-
- @Override
- public StageType getType() {
- return StageType.COLUMNSTATS;
- }
-
- @Override
- public String getName() {
- return "COLUMNSTATS TASK";
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 2331498..b4989f1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HivePartitioner;
-import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
import org.apache.hadoop.hive.ql.io.StreamingOutputFormat;
@@ -975,7 +974,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField);
int bucketProperty =
bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField));
- int bucketNum =
+ int bucketNum =
BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
writerOffset = 0;
if (multiFileSpray) {
@@ -1452,7 +1451,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
StatsCollectionContext sContext = new StatsCollectionContext(hconf);
- sContext.setStatsTmpDir(conf.getStatsTmpDir());
+ sContext.setStatsTmpDir(conf.getTmpStatsDir());
if (!statsPublisher.connect(sContext)) {
// just return, stats gathering should not block the main query
LOG.error("StatsPublishing error: cannot connect to database");
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
deleted file mode 100644
index c333c49..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
+++ /dev/null
@@ -1,396 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.HiveStatsUtils;
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
-import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.ReflectionUtil;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.MapMaker;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * StatsNoJobTask is used in cases where stats collection is the only task for the given query (no
- * parent MR or Tez job). It is used in the following cases 1) ANALYZE with noscan for
- * file formats that implement StatsProvidingRecordReader interface: ORC format (implements
- * StatsProvidingRecordReader) stores column statistics for all columns in the file footer. Its much
- * faster to compute the table/partition statistics by reading the footer than scanning all the
- * rows. This task can be used for computing basic stats like numFiles, numRows, fileSize,
- * rawDataSize from ORC footer.
- **/
-public class StatsNoJobTask extends Task<StatsNoJobWork> implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private static transient final Logger LOG = LoggerFactory.getLogger(StatsNoJobTask.class);
- private ConcurrentMap<String, Partition> partUpdates;
- private Table table;
- private String tableFullName;
- private JobConf jc = null;
-
- public StatsNoJobTask() {
- super();
- }
-
- @Override
- public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext,
- CompilationOpContext opContext) {
- super.initialize(queryState, queryPlan, driverContext, opContext);
- jc = new JobConf(conf);
- }
-
- @Override
- public int execute(DriverContext driverContext) {
-
- LOG.info("Executing stats (no job) task");
-
- String tableName = "";
- ExecutorService threadPool = null;
- Hive db = getHive();
- try {
- tableName = work.getTableSpecs().tableName;
- table = db.getTable(tableName);
- int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_STATS_GATHER_NUM_THREADS);
- tableFullName = table.getFullyQualifiedName();
- threadPool = Executors.newFixedThreadPool(numThreads,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("StatsNoJobTask-Thread-%d")
- .build());
- partUpdates = new MapMaker().concurrencyLevel(numThreads).makeMap();
- LOG.info("Initialized threadpool for stats computation with {} threads", numThreads);
- } catch (HiveException e) {
- LOG.error("Cannot get table {}", tableName, e);
- console.printError("Cannot get table " + tableName, e.toString());
- }
-
- return aggregateStats(threadPool, db);
- }
-
- @Override
- public StageType getType() {
- return StageType.STATS;
- }
-
- @Override
- public String getName() {
- return "STATS-NO-JOB";
- }
-
- class StatsCollection implements Runnable {
-
- private final Partition partn;
-
- public StatsCollection(Partition part) {
- this.partn = part;
- }
-
- @Override
- public void run() {
-
- // get the list of partitions
- org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
- Map<String, String> parameters = tPart.getParameters();
-
- try {
- Path dir = new Path(tPart.getSd().getLocation());
- long numRows = 0;
- long rawDataSize = 0;
- long fileSize = 0;
- long numFiles = 0;
- FileSystem fs = dir.getFileSystem(conf);
- FileStatus[] fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs);
-
- boolean statsAvailable = false;
- for(FileStatus file: fileList) {
- if (!file.isDir()) {
- InputFormat<?, ?> inputFormat = ReflectionUtil.newInstance(
- partn.getInputFormatClass(), jc);
- InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0,
- new String[] { partn.getLocation() });
- org.apache.hadoop.mapred.RecordReader<?, ?> recordReader =
- inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
- StatsProvidingRecordReader statsRR;
- if (recordReader instanceof StatsProvidingRecordReader) {
- statsRR = (StatsProvidingRecordReader) recordReader;
- rawDataSize += statsRR.getStats().getRawDataSize();
- numRows += statsRR.getStats().getRowCount();
- fileSize += file.getLen();
- numFiles += 1;
- statsAvailable = true;
- }
- recordReader.close();
- }
- }
-
- if (statsAvailable) {
- parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
- parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(rawDataSize));
- parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(fileSize));
- parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(numFiles));
-
- partUpdates.put(tPart.getSd().getLocation(), new Partition(table, tPart));
-
- // printout console and debug logs
- String threadName = Thread.currentThread().getName();
- String msg = "Partition " + tableFullName + partn.getSpec() + " stats: ["
- + toString(parameters) + ']';
- LOG.debug("{}: {}", threadName, msg);
- console.printInfo(msg);
- } else {
- String threadName = Thread.currentThread().getName();
- String msg = "Partition " + tableFullName + partn.getSpec() + " does not provide stats.";
- LOG.debug("{}: {}", threadName, msg);
- }
- } catch (Exception e) {
- console.printInfo("[Warning] could not update stats for " + tableFullName + partn.getSpec()
- + ".",
- "Failed with exception " + e.getMessage() + "\n" + StringUtils.stringifyException(e));
-
- // Before updating the partition params, if any partition params is null
- // and if statsReliable is true then updatePartition() function will fail
- // the task by returning 1
- if (work.isStatsReliable()) {
- partUpdates.put(tPart.getSd().getLocation(), null);
- }
- }
- }
-
- private String toString(Map<String, String> parameters) {
- StringBuilder builder = new StringBuilder();
- for (String statType : StatsSetupConst.supportedStats) {
- String value = parameters.get(statType);
- if (value != null) {
- if (builder.length() > 0) {
- builder.append(", ");
- }
- builder.append(statType).append('=').append(value);
- }
- }
- return builder.toString();
- }
-
- }
-
- private int aggregateStats(ExecutorService threadPool, Hive db) {
- int ret = 0;
-
- try {
- Collection<Partition> partitions = null;
- if (work.getPrunedPartitionList() == null) {
- partitions = getPartitionsList();
- } else {
- partitions = work.getPrunedPartitionList().getPartitions();
- }
-
- // non-partitioned table
- if (partitions == null) {
- org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
- Map<String, String> parameters = tTable.getParameters();
- try {
- Path dir = new Path(tTable.getSd().getLocation());
- LOG.debug("Aggregating stats for " + dir);
- long numRows = 0;
- long rawDataSize = 0;
- long fileSize = 0;
- long numFiles = 0;
- FileSystem fs = dir.getFileSystem(conf);
- FileStatus[] fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs);
-
- boolean statsAvailable = false;
- for(FileStatus file: fileList) {
- LOG.debug("Computing stats for " + file);
- if (!file.isDir()) {
- InputFormat<?, ?> inputFormat = ReflectionUtil.newInstance(
- table.getInputFormatClass(), jc);
- InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0, new String[] { table
- .getDataLocation().toString() });
- if (file.getLen() == 0) {
- numFiles += 1;
- statsAvailable = true;
- } else {
- org.apache.hadoop.mapred.RecordReader<?, ?> recordReader =
- inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
- StatsProvidingRecordReader statsRR;
- if (recordReader instanceof StatsProvidingRecordReader) {
- statsRR = (StatsProvidingRecordReader) recordReader;
- numRows += statsRR.getStats().getRowCount();
- rawDataSize += statsRR.getStats().getRawDataSize();
- fileSize += file.getLen();
- numFiles += 1;
- statsAvailable = true;
- }
- recordReader.close();
- }
- }
- }
-
- if (statsAvailable) {
- parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
- parameters.put(StatsSetupConst.RAW_DATA_SIZE, String.valueOf(rawDataSize));
- parameters.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(fileSize));
- parameters.put(StatsSetupConst.NUM_FILES, String.valueOf(numFiles));
- EnvironmentContext environmentContext = new EnvironmentContext();
- environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
-
- db.alterTable(table, environmentContext);
-
- String msg = "Table " + tableFullName + " stats: [" + toString(parameters) + ']';
- if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
- Utilities.FILE_OP_LOGGER.trace(msg);
- }
- console.printInfo(msg);
- LOG.debug("Table {} does not provide stats.", tableFullName);
- }
- } catch (Exception e) {
- console.printInfo("[Warning] could not update stats for " + tableFullName + ".",
- "Failed with exception " + e.getMessage() + "\n" + StringUtils.stringifyException(e));
- }
- } else {
-
- // Partitioned table
- for (Partition partn : partitions) {
- threadPool.execute(new StatsCollection(partn));
- }
-
- LOG.debug("Stats collection waiting for threadpool to shutdown..");
- shutdownAndAwaitTermination(threadPool);
- LOG.debug("Stats collection threadpool shutdown successful.");
-
- ret = updatePartitions(db);
- }
-
- } catch (Exception e) {
- // Fail the query if the stats are supposed to be reliable
- if (work.isStatsReliable()) {
- ret = -1;
- }
- }
-
- // The return value of 0 indicates success,
- // anything else indicates failure
- return ret;
- }
-
- private int updatePartitions(Hive db) throws InvalidOperationException, HiveException {
- if (!partUpdates.isEmpty()) {
- List<Partition> updatedParts = Lists.newArrayList(partUpdates.values());
- if (updatedParts.contains(null) && work.isStatsReliable()) {
- LOG.debug("Stats requested to be reliable. Empty stats found and hence failing the task.");
- return -1;
- } else {
- LOG.debug("Bulk updating partitions..");
- EnvironmentContext environmentContext = new EnvironmentContext();
- environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
- db.alterPartitions(tableFullName, Lists.newArrayList(partUpdates.values()),
- environmentContext);
- LOG.debug("Bulk updated {} partitions.", partUpdates.values().size());
- }
- }
- return 0;
- }
-
- private void shutdownAndAwaitTermination(ExecutorService threadPool) {
-
- // Disable new tasks from being submitted
- threadPool.shutdown();
- try {
-
- // Wait a while for existing tasks to terminate
- while (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
- LOG.debug("Waiting for all stats tasks to finish...");
- }
- // Cancel currently executing tasks
- threadPool.shutdownNow();
-
- // Wait a while for tasks to respond to being cancelled
- if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) {
- LOG.debug("Stats collection thread pool did not terminate");
- }
- } catch (InterruptedException ie) {
-
- // Cancel again if current thread also interrupted
- threadPool.shutdownNow();
-
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
-
- private String toString(Map<String, String> parameters) {
- StringBuilder builder = new StringBuilder();
- for (String statType : StatsSetupConst.supportedStats) {
- String value = parameters.get(statType);
- if (value != null) {
- if (builder.length() > 0) {
- builder.append(", ");
- }
- builder.append(statType).append('=').append(value);
- }
- }
- return builder.toString();
- }
-
- private List<Partition> getPartitionsList() throws HiveException {
- if (work.getTableSpecs() != null) {
- TableSpec tblSpec = work.getTableSpecs();
- table = tblSpec.tableHandle;
- if (!table.isPartitioned()) {
- return null;
- } else {
- return tblSpec.partitions;
- }
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 682b42c..567126e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -16,501 +16,143 @@
* limitations under the License.
*/
-
package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.stats.StatsAggregator;
-import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
-import org.apache.hadoop.hive.ql.stats.StatsFactory;
-import org.apache.hadoop.hive.ql.stats.StatsPublisher;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.ql.stats.BasicStatsNoJobTask;
+import org.apache.hadoop.hive.ql.stats.BasicStatsTask;
+import org.apache.hadoop.hive.ql.stats.ColStatsProcessor;
+import org.apache.hadoop.hive.ql.stats.IStatsProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
- * StatsTask implementation. StatsTask mainly deals with "collectable" stats. These are
- * stats that require data scanning and are collected during query execution (unless the user
- * explicitly requests data scanning just for the purpose of stats computation using the "ANALYZE"
- * command. All other stats are computed directly by the MetaStore. The rationale being that the
- * MetaStore layer covers all Thrift calls and provides better guarantees about the accuracy of
- * those stats.
+ * StatsTask implementation.
**/
-public class StatsTask extends Task<StatsWork> implements Serializable {
+public class StatsTask extends Task<StatsWork> implements Serializable {
private static final long serialVersionUID = 1L;
private static transient final Logger LOG = LoggerFactory.getLogger(StatsTask.class);
- private Table table;
- private Collection<Partition> dpPartSpecs;
-
public StatsTask() {
super();
- dpPartSpecs = null;
}
- @Override
- protected void receiveFeed(FeedType feedType, Object feedValue) {
- // this method should be called by MoveTask when there are dynamic partitions generated
- if (feedType == FeedType.DYNAMIC_PARTITIONS) {
- dpPartSpecs = (Collection<Partition>) feedValue;
- }
- }
+ List<IStatsProcessor> processors = new ArrayList<>();
@Override
- public int execute(DriverContext driverContext) {
- if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) {
- return 0;
- }
- LOG.info("Executing stats task");
- // Make sure that it is either an ANALYZE, INSERT OVERWRITE (maybe load) or CTAS command
- short workComponentsPresent = 0;
- if (work.getLoadTableDesc() != null) {
- workComponentsPresent++;
- }
- if (work.getTableSpecs() != null) {
- workComponentsPresent++;
+ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx,
+ CompilationOpContext opContext) {
+ super.initialize(queryState, queryPlan, ctx, opContext);
+
+ if (work.getBasicStatsWork() != null) {
+ BasicStatsTask task = new BasicStatsTask(conf, work.getBasicStatsWork());
+ task.followedColStats = work.hasColStats();
+ processors.add(0, task);
+ } else if (work.isFooterScan()) {
+ BasicStatsNoJobTask t = new BasicStatsNoJobTask(conf, work.getBasicStatsNoJobWork());
+ processors.add(0, t);
}
- if (work.getLoadFileDesc() != null) {
- workComponentsPresent++;
+ if (work.hasColStats()) {
+ processors.add(new ColStatsProcessor(work.getColStats(), conf));
}
- assert (workComponentsPresent == 1);
-
- String tableName = "";
- Hive hive = getHive();
- try {
- if (work.getLoadTableDesc() != null) {
- tableName = work.getLoadTableDesc().getTable().getTableName();
- } else if (work.getTableSpecs() != null){
- tableName = work.getTableSpecs().tableName;
- } else {
- tableName = work.getLoadFileDesc().getDestinationCreateTable();
- }
-
- table = hive.getTable(tableName);
-
- } catch (HiveException e) {
- LOG.error("Cannot get table {}", tableName, e);
- console.printError("Cannot get table " + tableName, e.toString());
+ for (IStatsProcessor p : processors) {
+ p.initialize(opContext);
}
-
- return aggregateStats(hive);
-
}
- @Override
- public StageType getType() {
- return StageType.STATS;
- }
@Override
- public String getName() {
- return "STATS";
- }
-
- private int aggregateStats(Hive db) {
-
- StatsAggregator statsAggregator = null;
+ public int execute(DriverContext driverContext) {
+ if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) {
+ return 0;
+ }
+ if (work.isAggregating() && work.isFooterScan()) {
+ throw new RuntimeException("Can not have both basic stats work and stats no job work!");
+ }
int ret = 0;
- StatsCollectionContext scc = null;
- EnvironmentContext environmentContext = null;
try {
- // Stats setup:
- final Warehouse wh = new Warehouse(conf);
- if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {
- try {
- scc = getContext();
- statsAggregator = createStatsAggregator(scc, conf);
- } catch (HiveException e) {
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
- throw e;
- }
- console.printError(ErrorMsg.STATS_SKIPPING_BY_ERROR.getErrorCodedMsg(e.toString()));
- }
- }
-
- List<Partition> partitions = getPartitionsList(db);
- boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
-
- String tableFullName = table.getFullyQualifiedName();
-
- if (partitions == null) {
- org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
- Map<String, String> parameters = tTable.getParameters();
- // In the following scenarios, we need to reset the stats to true.
- // work.getTableSpecs() != null means analyze command
- // work.getLoadTableDesc().getReplace() is true means insert overwrite command
- // work.getLoadFileDesc().getDestinationCreateTable().isEmpty() means CTAS etc.
- // acidTable will not have accurate stats unless it is set through analyze command.
- if (work.getTableSpecs() == null && AcidUtils.isFullAcidTable(table)) {
- StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
- } else if (work.getTableSpecs() != null
- || (work.getLoadTableDesc() != null
- && (work.getLoadTableDesc().getLoadFileType() == LoadFileType.REPLACE_ALL))
- || (work.getLoadFileDesc() != null && !work.getLoadFileDesc()
- .getDestinationCreateTable().isEmpty())) {
- StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
- }
- // non-partitioned tables:
- if (!existStats(parameters) && atomic) {
- return 0;
- }
-
- // The collectable stats for the aggregator needs to be cleared.
- // For eg. if a file is being loaded, the old number of rows are not valid
- if (work.isClearAggregatorStats()) {
- // we choose to keep the invalid stats and only change the setting.
- StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
- }
-
- updateQuickStats(wh, parameters, tTable.getSd());
- if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
- if (statsAggregator != null) {
- String prefix = getAggregationPrefix(table, null);
- updateStats(statsAggregator, parameters, prefix, atomic);
- }
- // write table stats to metastore
- if (!getWork().getNoStatsAggregator()) {
- environmentContext = new EnvironmentContext();
- environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED,
- StatsSetupConst.TASK);
- }
- }
-
- getHive().alterTable(table, environmentContext);
- if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
- console.printInfo("Table " + tableFullName + " stats: [" + toString(parameters) + ']');
- }
- LOG.info("Table {} stats: [{}]", tableFullName, toString(parameters));
- if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
- Utilities.FILE_OP_LOGGER.trace(
- "Table " + tableFullName + " stats: [" + toString(parameters) + ']');
- }
- } else {
- // Partitioned table:
- // Need to get the old stats of the partition
- // and update the table stats based on the old and new stats.
- List<Partition> updates = new ArrayList<Partition>();
-
- //Get the file status up-front for all partitions. Beneficial in cases of blob storage systems
- final Map<String, FileStatus[]> fileStatusMap = new ConcurrentHashMap<String, FileStatus[]>();
- int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 1);
- // In case thread count is set to 0, use single thread.
- poolSize = Math.max(poolSize, 1);
- final ExecutorService pool = Executors.newFixedThreadPool(poolSize,
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("stats-updater-thread-%d")
- .build());
- final List<Future<Void>> futures = Lists.newLinkedList();
- LOG.debug("Getting file stats of all partitions. threadpool size: {}", poolSize);
- try {
- for(final Partition partn : partitions) {
- final String partitionName = partn.getName();
- final org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
- Map<String, String> parameters = tPart.getParameters();
-
- if (!existStats(parameters) && atomic) {
- continue;
- }
- futures.add(pool.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- FileStatus[] partfileStatus = wh.getFileStatusesForSD(tPart.getSd());
- fileStatusMap.put(partitionName, partfileStatus);
- return null;
- }
- }));
- }
- pool.shutdown();
- for(Future<Void> future : futures) {
- future.get();
- }
- } catch (InterruptedException e) {
- LOG.debug("Cancelling {} file stats lookup tasks", futures.size());
- //cancel other futures
- for (Future future : futures) {
- future.cancel(true);
- }
- // Fail the query if the stats are supposed to be reliable
- if (work.isStatsReliable()) {
- ret = 1;
- }
- } finally {
- if (pool != null) {
- pool.shutdownNow();
- }
- LOG.debug("Finished getting file stats of all partitions");
- }
- for (Partition partn : partitions) {
- //
- // get the old partition stats
- //
- org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
- Map<String, String> parameters = tPart.getParameters();
- if (work.getTableSpecs() == null && AcidUtils.isFullAcidTable(table)) {
- StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
- } else if (work.getTableSpecs() != null
- || (work.getLoadTableDesc() != null
- && (work.getLoadTableDesc().getLoadFileType() == LoadFileType.REPLACE_ALL))
- || (work.getLoadFileDesc() != null && !work.getLoadFileDesc()
- .getDestinationCreateTable().isEmpty())) {
- StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
- }
- //only when the stats exist, it is added to fileStatusMap
- if (!fileStatusMap.containsKey(partn.getName())) {
- continue;
- }
-
- // The collectable stats for the aggregator needs to be cleared.
- // For eg. if a file is being loaded, the old number of rows are not valid
- if (work.isClearAggregatorStats()) {
- // we choose to keep the invalid stats and only change the setting.
- StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
- }
+ if (work.isFooterScan()) {
+ work.getBasicStatsNoJobWork().setPartitions(work.getPartitions());
+ }
- updateQuickStats(parameters, fileStatusMap.get(partn.getName()));
- if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
- if (statsAggregator != null) {
- String prefix = getAggregationPrefix(table, partn);
- updateStats(statsAggregator, parameters, prefix, atomic);
- }
- if (!getWork().getNoStatsAggregator()) {
- environmentContext = new EnvironmentContext();
- environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED,
- StatsSetupConst.TASK);
- }
- }
- updates.add(new Partition(table, tPart));
+ Hive db = getHive();
+ Table tbl = getTable(db);
- if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
- console.printInfo("Partition " + tableFullName + partn.getSpec() +
- " stats: [" + toString(parameters) + ']');
- }
- LOG.info("Partition {}{} stats: [{}]", tableFullName, partn.getSpec(),
- toString(parameters));
- }
- if (!updates.isEmpty()) {
- db.alterPartitions(tableFullName, updates, environmentContext);
+ for (IStatsProcessor task : processors) {
+ task.setDpPartSpecs(dpPartSpecs);
+ ret = task.process(db, tbl);
+ if (ret != 0) {
+ return ret;
}
}
-
} catch (Exception e) {
- console.printInfo("[Warning] could not update stats.",
- "Failed with exception " + e.getMessage() + "\n"
- + StringUtils.stringifyException(e));
-
- // Fail the query if the stats are supposed to be reliable
- if (work.isStatsReliable()) {
- ret = 1;
- }
- } finally {
- if (statsAggregator != null) {
- statsAggregator.closeConnection(scc);
- }
+ LOG.error("Failed to run stats task", e);
+ return 1;
}
- // The return value of 0 indicates success,
- // anything else indicates failure
- return ret;
+ return 0;
}
- private String getAggregationPrefix(Table table, Partition partition)
- throws MetaException {
- // prefix is of the form dbName.tblName
- String prefix = StatsUtils.getFullyQualifiedTableName(table.getDbName(),
- org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.encodeTableName(table.getTableName()));
- if (partition != null) {
- return Utilities.join(prefix, Warehouse.makePartPath(partition.getSpec()));
+ private Table getTable(Hive db) throws SemanticException, HiveException {
+ Table tbl = work.getTable();
+ // FIXME for ctas this is still needed because location is not set sometimes
+ if (tbl.getSd().getLocation() == null) {
+ tbl = db.getTable(work.getFullTableName());
}
- return prefix;
+ return tbl;
}
- private StatsAggregator createStatsAggregator(StatsCollectionContext scc, HiveConf conf) throws HiveException {
- String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
- StatsFactory factory = StatsFactory.newFactory(statsImpl, conf);
- if (factory == null) {
- throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg());
- }
- // initialize stats publishing table for noscan which has only stats task
- // the rest of MR task following stats task initializes it in ExecDriver.java
- StatsPublisher statsPublisher = factory.getStatsPublisher();
- if (!statsPublisher.init(scc)) { // creating stats table if not exists
- throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
- }
-
- // manufacture a StatsAggregator
- StatsAggregator statsAggregator = factory.getStatsAggregator();
- if (!statsAggregator.connect(scc)) {
- throw new HiveException(ErrorMsg.STATSAGGREGATOR_CONNECTION_ERROR.getErrorCodedMsg(statsImpl));
- }
- return statsAggregator;
- }
-
- private StatsCollectionContext getContext() throws HiveException {
-
- StatsCollectionContext scc = new StatsCollectionContext(conf);
- Task sourceTask = getWork().getSourceTask();
- if (sourceTask == null) {
- throw new HiveException(ErrorMsg.STATSAGGREGATOR_SOURCETASK_NULL.getErrorCodedMsg());
- }
- scc.setTask(sourceTask);
- scc.setStatsTmpDir(this.getWork().getStatsTmpDir());
- return scc;
- }
-
- private boolean existStats(Map<String, String> parameters) {
- return parameters.containsKey(StatsSetupConst.ROW_COUNT)
- || parameters.containsKey(StatsSetupConst.NUM_FILES)
- || parameters.containsKey(StatsSetupConst.TOTAL_SIZE)
- || parameters.containsKey(StatsSetupConst.RAW_DATA_SIZE)
- || parameters.containsKey(StatsSetupConst.NUM_PARTITIONS);
- }
-
- private void updateStats(StatsAggregator statsAggregator,
- Map<String, String> parameters, String prefix, boolean atomic)
- throws HiveException {
-
- String aggKey = prefix.endsWith(Path.SEPARATOR) ? prefix : prefix + Path.SEPARATOR;
-
- for (String statType : StatsSetupConst.statsRequireCompute) {
- String value = statsAggregator.aggregateStats(aggKey, statType);
- if (value != null && !value.isEmpty()) {
- long longValue = Long.parseLong(value);
-
- if (work.getLoadTableDesc() != null &&
- (work.getLoadTableDesc().getLoadFileType() != LoadFileType.REPLACE_ALL)) {
- String originalValue = parameters.get(statType);
- if (originalValue != null) {
- longValue += Long.parseLong(originalValue); // todo: invalid + valid = invalid
- }
- }
- parameters.put(statType, String.valueOf(longValue));
- } else {
- if (atomic) {
- throw new HiveException(ErrorMsg.STATSAGGREGATOR_MISSED_SOMESTATS, statType);
- }
- }
- }
+ @Override
+ public StageType getType() {
+ return StageType.STATS;
}
- private void updateQuickStats(Warehouse wh, Map<String, String> parameters,
- StorageDescriptor desc) throws MetaException {
- /**
- * calculate fast statistics
- */
- FileStatus[] partfileStatus = wh.getFileStatusesForSD(desc);
- updateQuickStats(parameters, partfileStatus);
+ @Override
+ public String getName() {
+ return "STATS TASK";
}
- private void updateQuickStats(Map<String, String> parameters,
- FileStatus[] partfileStatus) throws MetaException {
- MetaStoreUtils.populateQuickStats(partfileStatus, parameters);
- }
+ private Collection<Partition> dpPartSpecs;
- private String toString(Map<String, String> parameters) {
- StringBuilder builder = new StringBuilder();
- for (String statType : StatsSetupConst.supportedStats) {
- String value = parameters.get(statType);
- if (value != null) {
- if (builder.length() > 0) {
- builder.append(", ");
- }
- builder.append(statType).append('=').append(value);
- }
+ @Override
+ protected void receiveFeed(FeedType feedType, Object feedValue) {
+ // this method should be called by MoveTask when there are dynamic
+ // partitions generated
+ if (feedType == FeedType.DYNAMIC_PARTITIONS) {
+ dpPartSpecs = (Collection<Partition>) feedValue;
}
- return builder.toString();
}
- /**
- * Get the list of partitions that need to update statistics.
- * TODO: we should reuse the Partitions generated at compile time
- * since getting the list of partitions is quite expensive.
- *
- * @return a list of partitions that need to update statistics.
- * @throws HiveException
- */
- private List<Partition> getPartitionsList(Hive db) throws HiveException {
- if (work.getLoadFileDesc() != null) {
- return null; //we are in CTAS, so we know there are no partitions
- }
-
- List<Partition> list = new ArrayList<Partition>();
+ public static ExecutorService newThreadPool(HiveConf conf) {
+ int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_STATS_GATHER_NUM_THREADS);
- if (work.getTableSpecs() != null) {
-
- // ANALYZE command
- TableSpec tblSpec = work.getTableSpecs();
- table = tblSpec.tableHandle;
- if (!table.isPartitioned()) {
- return null;
- }
- // get all partitions that matches with the partition spec
- List<Partition> partitions = tblSpec.partitions;
- if (partitions != null) {
- for (Partition partn : partitions) {
- list.add(partn);
- }
- }
- } else if (work.getLoadTableDesc() != null) {
-
- // INSERT OVERWRITE command
- LoadTableDesc tbd = work.getLoadTableDesc();
- table = db.getTable(tbd.getTable().getTableName());
- if (!table.isPartitioned()) {
- return null;
- }
- DynamicPartitionCtx dpCtx = tbd.getDPCtx();
- if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
- // If no dynamic partitions are generated, dpPartSpecs may not be initialized
- if (dpPartSpecs != null) {
- // load the list of DP partitions and return the list of partition specs
- list.addAll(dpPartSpecs);
- }
- } else { // static partition
- Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
- list.add(partn);
- }
- }
- return list;
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("StatsNoJobTask-Thread-%d").build());
+ LOG.info("Initialized threadpool for stats computation with {} threads", numThreads);
+ return executor;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index ab495cf..75603ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -349,6 +349,7 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
final List<Task<? extends Serializable>> leafTasks = new ArrayList<Task<?>>();
NodeUtils.iterateTask(rootTasks, Task.class, new NodeUtils.Function<Task>() {
+ @Override
public void apply(Task task) {
List dependents = task.getDependentTasks();
if (dependents == null || dependents.isEmpty()) {
@@ -648,4 +649,5 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
public boolean canExecuteInParallel(){
return true;
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 36a5eff..e22dc25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
import org.apache.hadoop.hive.ql.plan.CopyWork;
import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -53,8 +53,6 @@ import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
/**
@@ -98,10 +96,7 @@ public final class TaskFactory {
taskvec.add(new TaskTuple<MapredLocalWork>(MapredLocalWork.class,
MapredLocalTask.class));
- taskvec.add(new TaskTuple<StatsWork>(StatsWork.class,
- StatsTask.class));
- taskvec.add(new TaskTuple<StatsNoJobWork>(StatsNoJobWork.class, StatsNoJobTask.class));
- taskvec.add(new TaskTuple<ColumnStatsWork>(ColumnStatsWork.class, ColumnStatsTask.class));
+ taskvec.add(new TaskTuple<StatsWork>(StatsWork.class, StatsTask.class));
taskvec.add(new TaskTuple<ColumnStatsUpdateWork>(ColumnStatsUpdateWork.class, ColumnStatsUpdateTask.class));
taskvec.add(new TaskTuple<MergeFileWork>(MergeFileWork.class,
MergeFileTask.class));
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 71fa42c..00590e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -153,6 +153,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.IStatsGatherDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
@@ -161,7 +162,6 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.api.Adjacency;
import org.apache.hadoop.hive.ql.plan.api.Graph;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -1620,12 +1620,14 @@ public final class Utilities {
return removeTempOrDuplicateFiles(
fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept);
}
-
+
private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException {
FileStatus[] items = fs.listStatus(path);
// remove empty directory since DP insert should not generate empty partitions.
// empty directories could be generated by crashed Task/ScriptOperator
- if (items.length != 0) return false;
+ if (items.length != 0) {
+ return false;
+ }
if (!fs.delete(path, true)) {
LOG.error("Cannot delete empty directory {}", path);
throw new IOException("Cannot delete empty directory " + path);
@@ -3607,7 +3609,9 @@ public final class Utilities {
if (op instanceof FileSinkOperator) {
FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
- if (fdesc.isMmTable()) continue; // No need to create for MM tables
+ if (fdesc.isMmTable()) {
+ continue; // No need to create for MM tables
+ }
Path tempDir = fdesc.getDirName();
if (tempDir != null) {
Path tempPath = Utilities.toTempPath(tempDir);
@@ -3923,10 +3927,8 @@ public final class Utilities {
for (Operator<? extends OperatorDesc> op : ops) {
OperatorDesc desc = op.getConf();
String statsTmpDir = null;
- if (desc instanceof FileSinkDesc) {
- statsTmpDir = ((FileSinkDesc)desc).getStatsTmpDir();
- } else if (desc instanceof TableScanDesc) {
- statsTmpDir = ((TableScanDesc) desc).getTmpStatsDir();
+ if (desc instanceof IStatsGatherDesc) {
+ statsTmpDir = ((IStatsGatherDesc) desc).getTmpStatsDir();
}
if (statsTmpDir != null && !statsTmpDir.isEmpty()) {
statsTmpDirs.add(statsTmpDir);
@@ -4078,7 +4080,9 @@ public final class Utilities {
}
private static Path[] statusToPath(FileStatus[] statuses) {
- if (statuses == null) return null;
+ if (statuses == null) {
+ return null;
+ }
Path[] paths = new Path[statuses.length];
for (int i = 0; i < statuses.length; ++i) {
paths[i] = statuses[i].getPath();
@@ -4108,7 +4112,9 @@ public final class Utilities {
Utilities.FILE_OP_LOGGER.trace("Looking at {} from {}", subDir, lfsPath);
// If sorted, we'll skip a bunch of files.
- if (lastRelDir != null && subDir.startsWith(lastRelDir)) continue;
+ if (lastRelDir != null && subDir.startsWith(lastRelDir)) {
+ continue;
+ }
int startIx = skipLevels > 0 ? -1 : 0;
for (int i = 0; i < skipLevels; ++i) {
startIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1);
@@ -4118,7 +4124,9 @@ public final class Utilities {
break;
}
}
- if (startIx == -1) continue;
+ if (startIx == -1) {
+ continue;
+ }
int endIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1);
if (endIx == -1) {
Utilities.FILE_OP_LOGGER.info("Expected level of nesting ({}) is not present in"
@@ -4127,7 +4135,9 @@ public final class Utilities {
}
lastRelDir = subDir = subDir.substring(0, endIx);
Path candidate = new Path(relRoot, subDir);
- if (!filter.accept(candidate)) continue;
+ if (!filter.accept(candidate)) {
+ continue;
+ }
results.add(fs.makeQualified(candidate));
}
return results.toArray(new Path[results.size()]);
@@ -4168,7 +4178,7 @@ public final class Utilities {
public static void writeMmCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs,
String taskId, Long txnId, int stmtId, String unionSuffix) throws HiveException {
- if (CollectionUtils.isEmpty(commitPaths)) {
+ if (commitPaths.isEmpty()) {
return;
}
// We assume one FSOP per task (per specPath), so we create it in specPath.
@@ -4288,11 +4298,15 @@ public final class Utilities {
throw new HiveException("The following files were committed but not found: " + committed);
}
- if (mmDirectories.isEmpty()) return;
+ if (mmDirectories.isEmpty()) {
+ return;
+ }
// TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list bucketing,
// so maintain parity here by not calling it at all.
- if (lbLevels != 0) return;
+ if (lbLevels != 0) {
+ return;
+ }
// Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles
// doesn't need tocheck anything except path and directory status for MM directories.
FileStatus[] finalResults = new FileStatus[mmDirectories.size()];
@@ -4320,7 +4334,9 @@ public final class Utilities {
for (FileStatus child : fs.listStatus(dir)) {
Path childPath = child.getPath();
if (unionSuffix == null) {
- if (committed.remove(childPath.toString())) continue; // A good file.
+ if (committed.remove(childPath.toString())) {
+ continue; // A good file.
+ }
deleteUncommitedFile(childPath, fs);
} else if (!child.isDirectory()) {
if (committed.contains(childPath.toString())) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index cf4df9b..cceea01 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1794,7 +1794,9 @@ public class Hive {
}
// column stats will be inaccurate
- StatsSetupConst.clearColumnStatsState(newTPart.getParameters());
+ if (!hasFollowingStatsTask) {
+ StatsSetupConst.clearColumnStatsState(newTPart.getParameters());
+ }
// recreate the partition if it existed before
if (isSkewedStoreAsSubdir) {
@@ -1813,8 +1815,8 @@ public class Hive {
if (oldPart == null) {
newTPart.getTPartition().setParameters(new HashMap<String,String>());
if (this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
- StatsSetupConst.setStatsStateForCreateTable(newTPart.getParameters(), null,
- StatsSetupConst.TRUE);
+ StatsSetupConst.setStatsStateForCreateTable(newTPart.getParameters(),
+ MetaStoreUtils.getColumnNames(tbl.getCols()), StatsSetupConst.TRUE);
}
MetaStoreUtils.populateQuickStats(HiveStatsUtils.getFileStatusRecurse(newPartPath, -1, newPartPath.getFileSystem(conf)), newTPart.getParameters());
try {
@@ -2299,7 +2301,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
//column stats will be inaccurate
- StatsSetupConst.clearColumnStatsState(tbl.getParameters());
+ if (!hasFollowingStatsTask) {
+ StatsSetupConst.clearColumnStatsState(tbl.getParameters());
+ }
try {
if (isSkewedStoreAsSubdir) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
index 78e83af..1c26200 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.TableMeta;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -196,7 +197,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
Collections.sort(tableNames);
return tableNames;
}
-
+
@Override
public List<TableMeta> getTableMeta(String dbPatterns, String tablePatterns, List<String> tableTypes)
throws MetaException {
@@ -235,7 +236,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
}
return tableMetas;
}
-
+
private boolean matchesAny(String string, List<Matcher> matchers) {
for (Matcher matcher : matchers) {
if (matcher.reset(string).matches()) {
@@ -399,6 +400,8 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
EnvironmentContext envContext) throws AlreadyExistsException, InvalidObjectException,
MetaException, NoSuchObjectException, TException {
+ boolean isVirtualTable = tbl.getTableName().startsWith(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX);
+
SessionState ss = SessionState.get();
if (ss == null) {
throw new MetaException("No current SessionState, cannot create temporary table"
@@ -434,6 +437,10 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
// Add temp table info to current session
Table tTable = new Table(tbl);
+ if (!isVirtualTable) {
+ StatsSetupConst.setStatsStateForCreateTable(tbl.getParameters(),
+ org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getColumnNamesForTable(tbl), StatsSetupConst.TRUE);
+ }
if (tables == null) {
tables = new HashMap<String, Table>();
ss.getTempTables().put(dbName, tables);
@@ -466,8 +473,6 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
}
org.apache.hadoop.hive.metastore.api.Table newtCopy = deepCopyAndLowerCaseTable(newt);
- MetaStoreUtils.updateTableStatsFast(newtCopy,
- getWh().getFileStatusesForSD(newtCopy.getSd()), false, true, envContext);
Table newTable = new Table(newtCopy);
String newDbName = newTable.getDbName();
String newTableName = newTable.getTableName();
@@ -656,7 +661,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
public static Map<String, Table> getTempTablesForDatabase(String dbName) {
return getTempTables().get(dbName);
}
-
+
public static Map<String, Map<String, Table>> getTempTables() {
SessionState ss = SessionState.get();
if (ss == null) {
@@ -712,6 +717,13 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
ssTableColStats);
}
mergeColumnStats(ssTableColStats, colStats);
+
+ List<String> colNames = new ArrayList<>();
+ for (ColumnStatisticsObj obj : colStats.getStatsObj()) {
+ colNames.add(obj.getColName());
+ }
+ org.apache.hadoop.hive.metastore.api.Table table = getTempTable(dbName, tableName);
+ StatsSetupConst.setColumnStatsState(table.getParameters(), colNames);
return true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ec9cc0bc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
index 768640c..4fb39fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
@@ -23,12 +23,10 @@ import java.util.Map;
import java.util.Set;
import java.util.Stack;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -41,10 +39,10 @@ import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.hive.ql.plan.BasicStatsWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.mapred.InputFormat;
/**
@@ -67,8 +65,8 @@ public class GenMRTableScan1 implements NodeProcessor {
TableScanOperator op = (TableScanOperator) nd;
GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
ParseContext parseCtx = ctx.getParseCtx();
- Class<? extends InputFormat> inputFormat = op.getConf().getTableMetadata()
- .getInputFormatClass();
+ Table table = op.getConf().getTableMetadata();
+ Class<? extends InputFormat> inputFormat = table.getInputFormatClass();
Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
// create a dummy MapReduce task
@@ -93,19 +91,17 @@ public class GenMRTableScan1 implements NodeProcessor {
// ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
// There will not be any MR or Tez job above this task
- StatsNoJobWork snjWork = new StatsNoJobWork(op.getConf().getTableMetadata().getTableSpec());
- snjWork.setStatsReliable(parseCtx.getConf().getBoolVar(
- HiveConf.ConfVars.HIVE_STATS_RELIABLE));
+ StatsWork statWork = new StatsWork(table, parseCtx.getConf());
+ statWork.setFooterScan();
+
// If partition is specified, get pruned partition list
Set<Partition> confirmedParts = GenMapRedUtils.getConfirmedPartitionsForScan(op);
if (confirmedParts.size() > 0) {
- Table source = op.getConf().getTableMetadata();
List<String> partCols = GenMapRedUtils.getPartitionColumns(op);
- PrunedPartitionList partList = new PrunedPartitionList(source, confirmedParts,
- partCols, false);
- snjWork.setPrunedPartitionList(partList);
+ PrunedPartitionList partList = new PrunedPartitionList(table, confirmedParts, partCols, false);
+ statWork.addInputPartitions(partList.getPartitions());
}
- Task<StatsNoJobWork> snjTask = TaskFactory.get(snjWork, parseCtx.getConf());
+ Task<StatsWork> snjTask = TaskFactory.get(statWork, parseCtx.getConf());
ctx.setCurrTask(snjTask);
ctx.setCurrTopOp(null);
ctx.getRootTasks().clear();
@@ -115,14 +111,15 @@ public class GenMRTableScan1 implements NodeProcessor {
// The plan consists of a simple MapRedTask followed by a StatsTask.
// The MR task is just a simple TableScanOperator
- StatsWork statsWork = new StatsWork(op.getConf().getTableMetadata().getTableSpec());
- statsWork.setAggKey(op.getConf().getStatsAggPrefix());
- statsWork.setStatsTmpDir(op.getConf().getTmpStatsDir());
- statsWork.setSourceTask(currTask);
- statsWork.setStatsReliable(parseCtx.getConf().getBoolVar(
- HiveConf.ConfVars.HIVE_STATS_RELIABLE));
- Task<StatsWork> statsTask = TaskFactory.get(statsWork, parseCtx.getConf());
- currTask.addDependentTask(statsTask);
+ BasicStatsWork statsWork = new BasicStatsWork(table.getTableSpec());
+
+ statsWork.setNoScanAnalyzeCommand(noScan);
+ StatsWork columnStatsWork = new StatsWork(table, statsWork, parseCtx.getConf());
+ columnStatsWork.collectStatsFromAggregator(op.getConf());
+
+ columnStatsWork.setSourceTask(currTask);
+ Task<StatsWork> columnStatsTask = TaskFactory.get(columnStatsWork, parseCtx.getConf());
+ currTask.addDependentTask(columnStatsTask);
if (!ctx.getRootTasks().contains(currTask)) {
ctx.getRootTasks().add(currTask);
}
@@ -130,10 +127,9 @@ public class GenMRTableScan1 implements NodeProcessor {
// ANALYZE TABLE T [PARTITION (...)] COMPUTE STATISTICS noscan;
// The plan consists of a StatsTask only.
if (noScan) {
- statsTask.setParentTasks(null);
- statsWork.setNoScanAnalyzeCommand(true);
+ columnStatsTask.setParentTasks(null);
ctx.getRootTasks().remove(currTask);
- ctx.getRootTasks().add(statsTask);
+ ctx.getRootTasks().add(columnStatsTask);
}
currWork.getMapWork().setGatheringStats(true);
@@ -147,9 +143,8 @@ public class GenMRTableScan1 implements NodeProcessor {
Set<Partition> confirmedPartns = GenMapRedUtils
.getConfirmedPartitionsForScan(op);
if (confirmedPartns.size() > 0) {
- Table source = op.getConf().getTableMetadata();
List<String> partCols = GenMapRedUtils.getPartitionColumns(op);
- PrunedPartitionList partList = new PrunedPartitionList(source, confirmedPartns, partCols, false);
+ PrunedPartitionList partList = new PrunedPartitionList(table, confirmedPartns, partCols, false);
GenMapRedUtils.setTaskPlan(currAliasId, op, currTask, false, ctx, partList);
} else { // non-partitioned table
GenMapRedUtils.setTaskPlan(currAliasId, op, currTask, false, ctx);