You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/03 18:38:00 UTC
svn commit: r1636402 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark:
SparkTask.java SparkUtilities.java
Author: xuefu
Date: Mon Nov 3 17:38:00 2014
New Revision: 1636402
URL: http://svn.apache.org/r1636402
Log:
HIVE-8682: Enable table statistic collection on counter for CTAS query[Spark Branch] (Chengxiang via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1636402&r1=1636401&r2=1636402&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Mon Nov 3 17:38:00 2014
@@ -19,15 +19,21 @@
package org.apache.hadoop.hive.ql.exec.spark;
import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
@@ -37,12 +43,19 @@ import org.apache.hadoop.hive.ql.exec.sp
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
@@ -78,10 +91,14 @@ public class SparkTask extends Task<Spar
sparkSession = sparkSessionManager.getSession(sparkSession, conf, true);
SessionState.get().setSparkSession(sparkSession);
SparkWork sparkWork = getWork();
+
+ // We need to pre register spark counters for table statistic collection related query.
String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
- if (statsImpl.equalsIgnoreCase("counter")) {
- sparkWork.setRequiredCounterPrefix(SparkUtilities.getRequiredCounterPrefix(this, db));
+ StatsTask statsTask = getStatsTaskInChildTasks(this);
+ if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
+ sparkWork.setRequiredCounterPrefix(getRequiredCounterPrefix(statsTask));
}
+
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
sparkCounters = jobRef.getSparkJobStatus().getCounter();
SparkJobMonitor monitor = new SparkJobMonitor(jobRef.getSparkJobStatus());
@@ -183,4 +200,106 @@ public class SparkTask extends Task<Spar
console.printInfo("In order to set a constant number of reducers:");
console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
}
+
+ private List<String> getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException {
+ List<String> prefixs = new LinkedList<String>();
+ StatsWork statsWork = statsTask.getWork();
+ String prefix = getPrefix(statsWork);
+ List<Partition> partitions = getPartitionsList(statsWork);
+ int maxPrefixLength = StatsFactory.getMaxPrefixLength(conf);
+
+ if (partitions == null) {
+ prefixs.add(Utilities.getHashedStatsPrefix(prefix, maxPrefixLength));
+ } else {
+ for (Partition partition : partitions) {
+ String prefixWithPartition = Utilities.join(prefix, Warehouse.makePartPath(partition.getSpec()));
+ prefixs.add(Utilities.getHashedStatsPrefix(prefixWithPartition, maxPrefixLength));
+ }
+ }
+
+ return prefixs;
+ }
+
+ private String getPrefix(StatsWork work) throws HiveException {
+ String tableName;
+ if (work.getLoadTableDesc() != null) {
+ tableName = work.getLoadTableDesc().getTable().getTableName();
+ } else if (work.getTableSpecs() != null) {
+ tableName = work.getTableSpecs().tableName;
+ } else {
+ tableName = work.getLoadFileDesc().getDestinationCreateTable();
+ }
+ Table table = null;
+ try {
+ table = db.getTable(tableName);
+ } catch (HiveException e) {
+ LOG.warn("Failed to get table:" + tableName);
+ // For CTAS query, table does not exist in this period, just use table name as prefix.
+ return tableName.toLowerCase();
+ }
+ return table.getDbName() + "." + table.getTableName();
+ }
+
+ private static StatsTask getStatsTaskInChildTasks(Task<? extends Serializable> rootTask) {
+
+ List<Task<? extends Serializable>> childTasks = rootTask.getChildTasks();
+ if (childTasks == null) {
+ return null;
+ }
+ for (Task<? extends Serializable> task : childTasks) {
+ if (task instanceof StatsTask) {
+ return (StatsTask)task;
+ } else {
+ Task<? extends Serializable> childTask = getStatsTaskInChildTasks(task);
+ if (childTask instanceof StatsTask) {
+ return (StatsTask)childTask;
+ } else {
+ continue;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private List<Partition> getPartitionsList(StatsWork work) throws HiveException {
+ if (work.getLoadFileDesc() != null) {
+ return null; //we are in CTAS, so we know there are no partitions
+ }
+ Table table;
+ List<Partition> list = new ArrayList<Partition>();
+
+ if (work.getTableSpecs() != null) {
+
+ // ANALYZE command
+ BaseSemanticAnalyzer.tableSpec tblSpec = work.getTableSpecs();
+ table = tblSpec.tableHandle;
+ if (!table.isPartitioned()) {
+ return null;
+ }
+ // get all partitions that matches with the partition spec
+ List<Partition> partitions = tblSpec.partitions;
+ if (partitions != null) {
+ for (Partition partn : partitions) {
+ list.add(partn);
+ }
+ }
+ } else if (work.getLoadTableDesc() != null) {
+
+ // INSERT OVERWRITE command
+ LoadTableDesc tbd = work.getLoadTableDesc();
+ table = db.getTable(tbd.getTable().getTableName());
+ if (!table.isPartitioned()) {
+ return null;
+ }
+ DynamicPartitionCtx dpCtx = tbd.getDPCtx();
+ if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
+ // we could not get dynamic partition information before SparkTask execution.
+ } else { // static partition
+ Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
+ list.add(partn);
+ }
+ }
+ return list;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1636402&r1=1636401&r2=1636402&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Mon Nov 3 17:38:00 2014
@@ -17,35 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.MoveTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.io.BytesWritable;
/**
@@ -69,109 +41,4 @@ public class SparkUtilities {
copy.set(bw);
return copy;
}
-
- public static List<String> getRequiredCounterPrefix(SparkTask sparkTask, Hive db)
- throws HiveException, MetaException {
-
- List<String> prefixs = new LinkedList<String>();
- List<BaseWork> works = sparkTask.getWork().getAllWork();
- for (BaseWork baseWork : works) {
- Set<Operator<?>> operators = baseWork.getAllOperators();
- for (Operator<?> operator : operators) {
- if (operator instanceof TableScanOperator) {
- TableScanOperator tableScanOperator = (TableScanOperator) operator;
- TableScanDesc tableScanDesc = tableScanOperator.getConf();
-
- if (tableScanDesc.isGatherStats()) {
- List<Task<? extends Serializable>> childTasks = getChildTasks(sparkTask);
- for (Task<? extends Serializable> task : childTasks) {
- if (task instanceof StatsTask) {
- StatsTask statsTask = (StatsTask) task;
- StatsWork statsWork = statsTask.getWork();
- // ANALYZE command
- BaseSemanticAnalyzer.tableSpec tblSpec = statsWork.getTableSpecs();
- Table table = tblSpec.tableHandle;
- if (!table.isPartitioned()) {
- prefixs.add(tableScanDesc.getStatsAggPrefix()); // non-partitioned
- } else {
- for (Partition partition : tblSpec.partitions) {
- String aggrPrefix = getAggregationPrefix(
- table, partition.getSpec(), tableScanDesc.getMaxStatsKeyPrefixLength());
- prefixs.add(aggrPrefix);
- }
- }
- }
- }
- }
- } else if (operator instanceof FileSinkOperator) {
- FileSinkOperator fileSinkOperator = (FileSinkOperator) operator;
- FileSinkDesc fileSinkDesc = fileSinkOperator.getConf();
-
- if (fileSinkDesc.isGatherStats()) {
- List<Task<? extends Serializable>> childTasks = getChildTasks(sparkTask);
- for (Task<? extends Serializable> task : childTasks) {
- if (task instanceof MoveTask) {
- MoveTask moveTask = (MoveTask) task;
- MoveWork moveWork = moveTask.getWork();
-
- // INSERT OVERWRITE command
- LoadTableDesc tbd = moveWork.getLoadTableWork();
- Table table = db.getTable(tbd.getTable().getTableName());
- if (!table.isPartitioned()) {
- prefixs.add(
- getAggregationPrefix(table, null, fileSinkDesc.getMaxStatsKeyPrefixLength()));
- } else {
- DynamicPartitionCtx dpCtx = tbd.getDPCtx();
- if (dpCtx == null || dpCtx.getNumDPCols() == 0) {
- // static partition
- Map<String, String> partitionSpec = tbd.getPartitionSpec();
- if (partitionSpec != null && !partitionSpec.isEmpty()) {
- String aggrPrefix = getAggregationPrefix(
- table, partitionSpec, fileSinkDesc.getMaxStatsKeyPrefixLength());
- prefixs.add(aggrPrefix);
- }
- } else {
- // dynamic partition
- }
- }
- }
- }
- }
- }
- }
- }
- return prefixs;
- }
-
- private static String getAggregationPrefix(Table table, Map<String, String> partitionSpec, int maxKeyLength)
- throws MetaException {
- StringBuilder prefix = new StringBuilder();
- // prefix is of the form dbName.tblName
- prefix.append(table.getDbName()).append('.').append(table.getTableName());
- if (partitionSpec != null) {
- return Utilities.join(prefix.toString(), Warehouse.makePartPath(partitionSpec));
- }
- return Utilities.getHashedStatsPrefix(prefix.toString(), maxKeyLength);
- }
-
- private static List<Task<? extends Serializable>> getChildTasks(
- Task<? extends Serializable> rootTask) {
-
- List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
- fillChildTasks(tasks, rootTask);
- return tasks;
- }
-
- private static void fillChildTasks(
- List<Task<? extends Serializable>> tasks,
- Task<? extends Serializable> rootTask) {
-
- List<Task<? extends Serializable>> childTasks = rootTask.getChildTasks();
- tasks.add(rootTask);
- if (childTasks != null) {
- for (Task<? extends Serializable> task : childTasks) {
- fillChildTasks(tasks, task);
- }
- }
- }
}