You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/03 18:38:00 UTC

svn commit: r1636402 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: SparkTask.java SparkUtilities.java

Author: xuefu
Date: Mon Nov  3 17:38:00 2014
New Revision: 1636402

URL: http://svn.apache.org/r1636402
Log:
HIVE-8682: Enable table statistic collection on counter for CTAS query[Spark Branch] (Chengxiang via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1636402&r1=1636401&r2=1636402&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Mon Nov  3 17:38:00 2014
@@ -19,15 +19,21 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
@@ -37,12 +43,19 @@ import org.apache.hadoop.hive.ql.exec.sp
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
 
@@ -78,10 +91,14 @@ public class SparkTask extends Task<Spar
       sparkSession = sparkSessionManager.getSession(sparkSession, conf, true);
       SessionState.get().setSparkSession(sparkSession);
       SparkWork sparkWork = getWork();
+
+      // We need to pre register spark counters for table statistic collection related query.
       String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
-      if (statsImpl.equalsIgnoreCase("counter")) {
-        sparkWork.setRequiredCounterPrefix(SparkUtilities.getRequiredCounterPrefix(this, db));
+      StatsTask statsTask = getStatsTaskInChildTasks(this);
+      if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
+        sparkWork.setRequiredCounterPrefix(getRequiredCounterPrefix(statsTask));
       }
+
       SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
       sparkCounters = jobRef.getSparkJobStatus().getCounter();
       SparkJobMonitor monitor = new SparkJobMonitor(jobRef.getSparkJobStatus());
@@ -183,4 +200,106 @@ public class SparkTask extends Task<Spar
     console.printInfo("In order to set a constant number of reducers:");
     console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
   }
+
+  private List<String> getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException {
+    List<String> prefixs = new LinkedList<String>();
+    StatsWork statsWork = statsTask.getWork();
+    String prefix = getPrefix(statsWork);
+    List<Partition> partitions = getPartitionsList(statsWork);
+    int maxPrefixLength = StatsFactory.getMaxPrefixLength(conf);
+
+    if (partitions == null) {
+      prefixs.add(Utilities.getHashedStatsPrefix(prefix, maxPrefixLength));
+    } else {
+      for (Partition partition : partitions) {
+        String prefixWithPartition = Utilities.join(prefix, Warehouse.makePartPath(partition.getSpec()));
+        prefixs.add(Utilities.getHashedStatsPrefix(prefixWithPartition, maxPrefixLength));
+      }
+    }
+
+    return prefixs;
+  }
+
+  private String getPrefix(StatsWork work) throws HiveException {
+      String tableName;
+      if (work.getLoadTableDesc() != null) {
+        tableName = work.getLoadTableDesc().getTable().getTableName();
+      } else if (work.getTableSpecs() != null) {
+        tableName = work.getTableSpecs().tableName;
+      } else {
+        tableName = work.getLoadFileDesc().getDestinationCreateTable();
+      }
+    Table table = null;
+    try {
+      table = db.getTable(tableName);
+    } catch (HiveException e) {
+      LOG.warn("Failed to get table:" + tableName);
+      // For CTAS query, table does not exist in this period, just use table name as prefix.
+      return tableName.toLowerCase();
+    }
+    return table.getDbName() + "." + table.getTableName();
+  }
+
+  private static StatsTask getStatsTaskInChildTasks(Task<? extends Serializable> rootTask) {
+
+    List<Task<? extends Serializable>> childTasks = rootTask.getChildTasks();
+    if (childTasks == null) {
+      return null;
+    }
+    for (Task<? extends Serializable> task : childTasks) {
+      if (task instanceof StatsTask) {
+        return (StatsTask)task;
+      } else {
+        Task<? extends Serializable> childTask = getStatsTaskInChildTasks(task);
+        if (childTask instanceof StatsTask) {
+          return (StatsTask)childTask;
+        } else {
+          continue;
+        }
+      }
+    }
+
+    return null;
+  }
+
+  private List<Partition> getPartitionsList(StatsWork work) throws HiveException {
+    if (work.getLoadFileDesc() != null) {
+      return null; //we are in CTAS, so we know there are no partitions
+    }
+    Table table;
+    List<Partition> list = new ArrayList<Partition>();
+
+    if (work.getTableSpecs() != null) {
+
+      // ANALYZE command
+      BaseSemanticAnalyzer.tableSpec tblSpec = work.getTableSpecs();
+      table = tblSpec.tableHandle;
+      if (!table.isPartitioned()) {
+        return null;
+      }
+      // get all partitions that matches with the partition spec
+      List<Partition> partitions = tblSpec.partitions;
+      if (partitions != null) {
+        for (Partition partn : partitions) {
+          list.add(partn);
+        }
+      }
+    } else if (work.getLoadTableDesc() != null) {
+
+      // INSERT OVERWRITE command
+      LoadTableDesc tbd = work.getLoadTableDesc();
+      table = db.getTable(tbd.getTable().getTableName());
+      if (!table.isPartitioned()) {
+        return null;
+      }
+      DynamicPartitionCtx dpCtx = tbd.getDPCtx();
+      if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
+        // we could not get dynamic partition information before SparkTask execution.
+      } else { // static partition
+        Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
+        list.add(partn);
+      }
+    }
+    return list;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1636402&r1=1636401&r2=1636402&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Mon Nov  3 17:38:00 2014
@@ -17,35 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.MoveTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.io.BytesWritable;
 
 /**
@@ -69,109 +41,4 @@ public class SparkUtilities {
     copy.set(bw);
     return copy;
   }
-
-  public static List<String> getRequiredCounterPrefix(SparkTask sparkTask, Hive db)
-    throws HiveException, MetaException {
-
-    List<String> prefixs = new LinkedList<String>();
-    List<BaseWork> works = sparkTask.getWork().getAllWork();
-    for (BaseWork baseWork : works) {
-      Set<Operator<?>> operators = baseWork.getAllOperators();
-      for (Operator<?> operator : operators) {
-        if (operator instanceof TableScanOperator) {
-          TableScanOperator tableScanOperator = (TableScanOperator) operator;
-          TableScanDesc tableScanDesc = tableScanOperator.getConf();
-
-          if (tableScanDesc.isGatherStats()) {
-            List<Task<? extends Serializable>> childTasks = getChildTasks(sparkTask);
-            for (Task<? extends Serializable> task : childTasks) {
-              if (task instanceof StatsTask) {
-                StatsTask statsTask = (StatsTask) task;
-                StatsWork statsWork = statsTask.getWork();
-                // ANALYZE command
-                BaseSemanticAnalyzer.tableSpec tblSpec = statsWork.getTableSpecs();
-                Table table = tblSpec.tableHandle;
-                if (!table.isPartitioned()) {
-                  prefixs.add(tableScanDesc.getStatsAggPrefix()); // non-partitioned
-                } else {
-                  for (Partition partition : tblSpec.partitions) {
-                    String aggrPrefix = getAggregationPrefix(
-                      table, partition.getSpec(), tableScanDesc.getMaxStatsKeyPrefixLength());
-                    prefixs.add(aggrPrefix);
-                  }
-                }
-              }
-            }
-          }
-        } else if (operator instanceof FileSinkOperator) {
-          FileSinkOperator fileSinkOperator = (FileSinkOperator) operator;
-          FileSinkDesc fileSinkDesc = fileSinkOperator.getConf();
-
-          if (fileSinkDesc.isGatherStats()) {
-            List<Task<? extends Serializable>> childTasks = getChildTasks(sparkTask);
-            for (Task<? extends Serializable> task : childTasks) {
-              if (task instanceof MoveTask) {
-                MoveTask moveTask = (MoveTask) task;
-                MoveWork moveWork = moveTask.getWork();
-
-                // INSERT OVERWRITE command
-                LoadTableDesc tbd = moveWork.getLoadTableWork();
-                Table table = db.getTable(tbd.getTable().getTableName());
-                if (!table.isPartitioned()) {
-                  prefixs.add(
-                    getAggregationPrefix(table, null, fileSinkDesc.getMaxStatsKeyPrefixLength()));
-                } else {
-                  DynamicPartitionCtx dpCtx = tbd.getDPCtx();
-                  if (dpCtx == null || dpCtx.getNumDPCols() == 0) {
-                    // static partition
-                    Map<String, String> partitionSpec = tbd.getPartitionSpec();
-                    if (partitionSpec != null && !partitionSpec.isEmpty()) {
-                      String aggrPrefix = getAggregationPrefix(
-                        table, partitionSpec, fileSinkDesc.getMaxStatsKeyPrefixLength());
-                      prefixs.add(aggrPrefix);
-                    }
-                  } else {
-                    // dynamic partition
-                  }
-                }
-              }
-            }
-          }
-        }
-      }
-    }
-    return prefixs;
-  }
-
-  private static String getAggregationPrefix(Table table, Map<String, String> partitionSpec, int maxKeyLength)
-    throws MetaException {
-    StringBuilder prefix = new StringBuilder();
-    // prefix is of the form dbName.tblName
-    prefix.append(table.getDbName()).append('.').append(table.getTableName());
-    if (partitionSpec != null) {
-      return Utilities.join(prefix.toString(), Warehouse.makePartPath(partitionSpec));
-    }
-    return Utilities.getHashedStatsPrefix(prefix.toString(), maxKeyLength);
-  }
-
-  private static List<Task<? extends Serializable>> getChildTasks(
-    Task<? extends Serializable> rootTask) {
-
-    List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
-    fillChildTasks(tasks, rootTask);
-    return tasks;
-  }
-
-  private static void fillChildTasks(
-    List<Task<? extends Serializable>> tasks,
-    Task<? extends Serializable> rootTask) {
-
-    List<Task<? extends Serializable>> childTasks = rootTask.getChildTasks();
-    tasks.add(rootTask);
-    if (childTasks != null) {
-      for (Task<? extends Serializable> task : childTasks) {
-        fillChildTasks(tasks, task);
-      }
-    }
-  }
 }