You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/07 18:05:53 UTC

svn commit: r1637410 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/SparkClient.java exec/spark/SparkTask.java exec/spark/counter/SparkCounters.java plan/SparkWork.java

Author: xuefu
Date: Fri Nov  7 17:05:52 2014
New Revision: 1637410

URL: http://svn.apache.org/r1637410
Log:
HIVE-8777: Should only register used counters in SparkCounters[Spark Branch] (Chengxiang via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1637410&r1=1637409&r2=1637410&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Fri Nov  7 17:05:52 2014
@@ -197,12 +197,13 @@ public class SparkClient implements Seri
     fs.mkdirs(emptyScratchDir);
 
     SparkCounters sparkCounters = new SparkCounters(sc, hiveConf);
-    List<String> prefixes = sparkWork.getRequiredCounterPrefix();
+    Map<String, List<String>> prefixes = sparkWork.getRequiredCounterPrefix();
     // register spark counters before submit spark job.
     if (prefixes != null) {
-      for (String prefix : prefixes) {
-        sparkCounters.createCounter(prefix, StatsSetupConst.ROW_COUNT);
-        sparkCounters.createCounter(prefix, StatsSetupConst.RAW_DATA_SIZE);
+      for (String group : prefixes.keySet()) {
+        for (String counter : prefixes.get(group)) {
+          sparkCounters.createCounter(group, counter);
+        }
       }
     }
     SparkReporter sparkReporter = new SparkReporter(sparkCounters);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1637410&r1=1637409&r2=1637410&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Fri Nov  7 17:05:52 2014
@@ -22,18 +22,27 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
 import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -97,12 +106,7 @@ public class SparkTask extends Task<Spar
       SessionState.get().setSparkSession(sparkSession);
       SparkWork sparkWork = getWork();
 
-      // We need to pre register spark counters for table statistic collection related query.
-      String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
-      StatsTask statsTask = getStatsTaskInChildTasks(this);
-      if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
-        sparkWork.setRequiredCounterPrefix(getRequiredCounterPrefix(statsTask));
-      }
+      sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
 
       SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
       SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
@@ -225,18 +229,38 @@ public class SparkTask extends Task<Spar
     console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
   }
 
+  private Map<String, List<String>> getCounterPrefixes() throws HiveException, MetaException {
+    Map<String, List<String>> counters = getOperatorCounters();
+    StatsTask statsTask = getStatsTaskInChildTasks(this);
+    String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+    // fetch table prefix if SparkTask try to gather table statistics based on counter.
+    if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
+      List<String> prefixes = getRequiredCounterPrefix(statsTask);
+      for (String prefix : prefixes) {
+        List<String> counterGroup = counters.get(prefix);
+        if (counterGroup == null) {
+          counterGroup = new LinkedList<String>();
+          counters.put(prefix, counterGroup);
+        }
+        counterGroup.add(StatsSetupConst.ROW_COUNT);
+        counterGroup.add(StatsSetupConst.RAW_DATA_SIZE);
+      }
+    }
+    return counters;
+  }
+
   private List<String> getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException {
     List<String> prefixs = new LinkedList<String>();
     StatsWork statsWork = statsTask.getWork();
-    String prefix = getPrefix(statsWork);
+    String tablePrefix = getTablePrefix(statsWork);
     List<Partition> partitions = getPartitionsList(statsWork);
     int maxPrefixLength = StatsFactory.getMaxPrefixLength(conf);
 
     if (partitions == null) {
-      prefixs.add(Utilities.getHashedStatsPrefix(prefix, maxPrefixLength));
+      prefixs.add(Utilities.getHashedStatsPrefix(tablePrefix, maxPrefixLength));
     } else {
       for (Partition partition : partitions) {
-        String prefixWithPartition = Utilities.join(prefix, Warehouse.makePartPath(partition.getSpec()));
+        String prefixWithPartition = Utilities.join(tablePrefix, Warehouse.makePartPath(partition.getSpec()));
         prefixs.add(Utilities.getHashedStatsPrefix(prefixWithPartition, maxPrefixLength));
       }
     }
@@ -244,7 +268,7 @@ public class SparkTask extends Task<Spar
     return prefixs;
   }
 
-  private String getPrefix(StatsWork work) throws HiveException {
+  private String getTablePrefix(StatsWork work) throws HiveException {
       String tableName;
       if (work.getLoadTableDesc() != null) {
         tableName = work.getLoadTableDesc().getTable().getTableName();
@@ -326,4 +350,40 @@ public class SparkTask extends Task<Spar
     }
     return list;
   }
+
+  private Map<String, List<String>> getOperatorCounters() {
+    String groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+    Map<String, List<String>> counters = new HashMap<String, List<String>>();
+    List<String> hiveCounters = new LinkedList<String>();
+    counters.put(groupName, hiveCounters);
+    hiveCounters.add(Operator.HIVECOUNTERCREATEDFILES);
+    SparkWork sparkWork = this.getWork();
+    for (BaseWork work : sparkWork.getAllWork()) {
+      for (Operator operator : work.getAllOperators()) {
+        if (operator instanceof MapOperator) {
+          for (MapOperator.Counter counter : MapOperator.Counter.values()) {
+            hiveCounters.add(counter.toString());
+          }
+        } else if (operator instanceof FileSinkOperator) {
+          for (FileSinkOperator.Counter counter : FileSinkOperator.Counter.values()) {
+            hiveCounters.add(counter.toString());
+          }
+        } else if (operator instanceof ReduceSinkOperator) {
+          for (ReduceSinkOperator.Counter counter : ReduceSinkOperator.Counter.values()) {
+            hiveCounters.add(counter.toString());
+          }
+        }else if (operator instanceof ScriptOperator) {
+          for (ScriptOperator.Counter counter : ScriptOperator.Counter.values()) {
+            hiveCounters.add(counter.toString());
+          }
+        }else if (operator instanceof JoinOperator) {
+          for (JoinOperator.SkewkeyTableCounter counter : JoinOperator.SkewkeyTableCounter.values()) {
+            hiveCounters.add(counter.toString());
+          }
+        }
+      }
+    }
+
+    return counters;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java?rev=1637410&r1=1637409&r2=1637410&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java Fri Nov  7 17:05:52 2014
@@ -59,28 +59,6 @@ public class SparkCounters implements Se
     this.javaSparkContext = javaSparkContext;
     this.hiveConf = hiveConf;
     sparkCounterGroups = new HashMap<String, SparkCounterGroup>();
-    initializeSparkCounters();
-  }
-
-  /**
-   * pre-define all needed Counters here.
-   */
-  private void initializeSparkCounters() {
-    String groupName = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVECOUNTERGROUP);
-    createCounter(groupName, Operator.HIVECOUNTERCREATEDFILES);
-    createCounter(groupName, MapOperator.Counter.DESERIALIZE_ERRORS);
-    createCounter(groupName, MapOperator.Counter.RECORDS_IN);
-    createCounter(groupName, FileSinkOperator.Counter.RECORDS_OUT);
-    createCounter(groupName, ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE);
-    createCounter(groupName, ScriptOperator.Counter.DESERIALIZE_ERRORS);
-    createCounter(groupName, ScriptOperator.Counter.SERIALIZE_ERRORS);
-    createCounter(groupName, JoinOperator.SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS);
-    // TODO remove? changed due to HIVE-8429
-    createCounter(MapOperator.Counter.DESERIALIZE_ERRORS);
-    createCounter(MapOperator.Counter.RECORDS_IN);
-    createCounter(ScriptOperator.Counter.DESERIALIZE_ERRORS);
-    createCounter(ScriptOperator.Counter.SERIALIZE_ERRORS);
-    createCounter(JoinOperator.SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS);
   }
 
   public void createCounter(Enum<?> key) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1637410&r1=1637409&r2=1637410&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java Fri Nov  7 17:05:52 2014
@@ -54,7 +54,7 @@ public class SparkWork extends AbstractO
   protected final Map<Pair<BaseWork, BaseWork>, SparkEdgeProperty> edgeProperties =
       new HashMap<Pair<BaseWork, BaseWork>, SparkEdgeProperty>();
 
-  private List<String> requiredCounterPrefix;
+  private Map<String, List<String>> requiredCounterPrefix;
 
   public SparkWork(String name) {
     this.name = name + ":" + (++counter);
@@ -176,11 +176,11 @@ public class SparkWork extends AbstractO
     return new HashSet<BaseWork>(leaves);
   }
 
-  public void setRequiredCounterPrefix(List<String> requiredCounterPrefix) {
+  public void setRequiredCounterPrefix(Map<String, List<String>> requiredCounterPrefix) {
     this.requiredCounterPrefix = requiredCounterPrefix;
   }
 
-  public List<String> getRequiredCounterPrefix() {
+  public Map<String, List<String>> getRequiredCounterPrefix() {
     return requiredCounterPrefix;
   }