You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/07 18:05:53 UTC
svn commit: r1637410 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql:
exec/spark/SparkClient.java exec/spark/SparkTask.java
exec/spark/counter/SparkCounters.java plan/SparkWork.java
Author: xuefu
Date: Fri Nov 7 17:05:52 2014
New Revision: 1637410
URL: http://svn.apache.org/r1637410
Log:
HIVE-8777: Should only register used counters in SparkCounters[Spark Branch] (Chengxiang via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1637410&r1=1637409&r2=1637410&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Fri Nov 7 17:05:52 2014
@@ -197,12 +197,13 @@ public class SparkClient implements Seri
fs.mkdirs(emptyScratchDir);
SparkCounters sparkCounters = new SparkCounters(sc, hiveConf);
- List<String> prefixes = sparkWork.getRequiredCounterPrefix();
+ Map<String, List<String>> prefixes = sparkWork.getRequiredCounterPrefix();
// register spark counters before submit spark job.
if (prefixes != null) {
- for (String prefix : prefixes) {
- sparkCounters.createCounter(prefix, StatsSetupConst.ROW_COUNT);
- sparkCounters.createCounter(prefix, StatsSetupConst.RAW_DATA_SIZE);
+ for (String group : prefixes.keySet()) {
+ for (String counter : prefixes.get(group)) {
+ sparkCounters.createCounter(group, counter);
+ }
}
}
SparkReporter sparkReporter = new SparkReporter(sparkCounters);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1637410&r1=1637409&r2=1637410&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Fri Nov 7 17:05:52 2014
@@ -22,18 +22,27 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
import org.apache.hadoop.hive.ql.exec.StatsTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -97,12 +106,7 @@ public class SparkTask extends Task<Spar
SessionState.get().setSparkSession(sparkSession);
SparkWork sparkWork = getWork();
- // We need to pre register spark counters for table statistic collection related query.
- String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
- StatsTask statsTask = getStatsTaskInChildTasks(this);
- if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
- sparkWork.setRequiredCounterPrefix(getRequiredCounterPrefix(statsTask));
- }
+ sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
@@ -225,18 +229,38 @@ public class SparkTask extends Task<Spar
console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
}
+ private Map<String, List<String>> getCounterPrefixes() throws HiveException, MetaException {
+ Map<String, List<String>> counters = getOperatorCounters();
+ StatsTask statsTask = getStatsTaskInChildTasks(this);
+ String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+ // fetch table prefix if SparkTask try to gather table statistics based on counter.
+ if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
+ List<String> prefixes = getRequiredCounterPrefix(statsTask);
+ for (String prefix : prefixes) {
+ List<String> counterGroup = counters.get(prefix);
+ if (counterGroup == null) {
+ counterGroup = new LinkedList<String>();
+ counters.put(prefix, counterGroup);
+ }
+ counterGroup.add(StatsSetupConst.ROW_COUNT);
+ counterGroup.add(StatsSetupConst.RAW_DATA_SIZE);
+ }
+ }
+ return counters;
+ }
+
private List<String> getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException {
List<String> prefixs = new LinkedList<String>();
StatsWork statsWork = statsTask.getWork();
- String prefix = getPrefix(statsWork);
+ String tablePrefix = getTablePrefix(statsWork);
List<Partition> partitions = getPartitionsList(statsWork);
int maxPrefixLength = StatsFactory.getMaxPrefixLength(conf);
if (partitions == null) {
- prefixs.add(Utilities.getHashedStatsPrefix(prefix, maxPrefixLength));
+ prefixs.add(Utilities.getHashedStatsPrefix(tablePrefix, maxPrefixLength));
} else {
for (Partition partition : partitions) {
- String prefixWithPartition = Utilities.join(prefix, Warehouse.makePartPath(partition.getSpec()));
+ String prefixWithPartition = Utilities.join(tablePrefix, Warehouse.makePartPath(partition.getSpec()));
prefixs.add(Utilities.getHashedStatsPrefix(prefixWithPartition, maxPrefixLength));
}
}
@@ -244,7 +268,7 @@ public class SparkTask extends Task<Spar
return prefixs;
}
- private String getPrefix(StatsWork work) throws HiveException {
+ private String getTablePrefix(StatsWork work) throws HiveException {
String tableName;
if (work.getLoadTableDesc() != null) {
tableName = work.getLoadTableDesc().getTable().getTableName();
@@ -326,4 +350,40 @@ public class SparkTask extends Task<Spar
}
return list;
}
+
+ private Map<String, List<String>> getOperatorCounters() {
+ String groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+ Map<String, List<String>> counters = new HashMap<String, List<String>>();
+ List<String> hiveCounters = new LinkedList<String>();
+ counters.put(groupName, hiveCounters);
+ hiveCounters.add(Operator.HIVECOUNTERCREATEDFILES);
+ SparkWork sparkWork = this.getWork();
+ for (BaseWork work : sparkWork.getAllWork()) {
+ for (Operator operator : work.getAllOperators()) {
+ if (operator instanceof MapOperator) {
+ for (MapOperator.Counter counter : MapOperator.Counter.values()) {
+ hiveCounters.add(counter.toString());
+ }
+ } else if (operator instanceof FileSinkOperator) {
+ for (FileSinkOperator.Counter counter : FileSinkOperator.Counter.values()) {
+ hiveCounters.add(counter.toString());
+ }
+ } else if (operator instanceof ReduceSinkOperator) {
+ for (ReduceSinkOperator.Counter counter : ReduceSinkOperator.Counter.values()) {
+ hiveCounters.add(counter.toString());
+ }
+ }else if (operator instanceof ScriptOperator) {
+ for (ScriptOperator.Counter counter : ScriptOperator.Counter.values()) {
+ hiveCounters.add(counter.toString());
+ }
+ }else if (operator instanceof JoinOperator) {
+ for (JoinOperator.SkewkeyTableCounter counter : JoinOperator.SkewkeyTableCounter.values()) {
+ hiveCounters.add(counter.toString());
+ }
+ }
+ }
+ }
+
+ return counters;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java?rev=1637410&r1=1637409&r2=1637410&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java Fri Nov 7 17:05:52 2014
@@ -59,28 +59,6 @@ public class SparkCounters implements Se
this.javaSparkContext = javaSparkContext;
this.hiveConf = hiveConf;
sparkCounterGroups = new HashMap<String, SparkCounterGroup>();
- initializeSparkCounters();
- }
-
- /**
- * pre-define all needed Counters here.
- */
- private void initializeSparkCounters() {
- String groupName = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVECOUNTERGROUP);
- createCounter(groupName, Operator.HIVECOUNTERCREATEDFILES);
- createCounter(groupName, MapOperator.Counter.DESERIALIZE_ERRORS);
- createCounter(groupName, MapOperator.Counter.RECORDS_IN);
- createCounter(groupName, FileSinkOperator.Counter.RECORDS_OUT);
- createCounter(groupName, ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE);
- createCounter(groupName, ScriptOperator.Counter.DESERIALIZE_ERRORS);
- createCounter(groupName, ScriptOperator.Counter.SERIALIZE_ERRORS);
- createCounter(groupName, JoinOperator.SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS);
- // TODO remove? changed due to HIVE-8429
- createCounter(MapOperator.Counter.DESERIALIZE_ERRORS);
- createCounter(MapOperator.Counter.RECORDS_IN);
- createCounter(ScriptOperator.Counter.DESERIALIZE_ERRORS);
- createCounter(ScriptOperator.Counter.SERIALIZE_ERRORS);
- createCounter(JoinOperator.SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS);
}
public void createCounter(Enum<?> key) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1637410&r1=1637409&r2=1637410&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java Fri Nov 7 17:05:52 2014
@@ -54,7 +54,7 @@ public class SparkWork extends AbstractO
protected final Map<Pair<BaseWork, BaseWork>, SparkEdgeProperty> edgeProperties =
new HashMap<Pair<BaseWork, BaseWork>, SparkEdgeProperty>();
- private List<String> requiredCounterPrefix;
+ private Map<String, List<String>> requiredCounterPrefix;
public SparkWork(String name) {
this.name = name + ":" + (++counter);
@@ -176,11 +176,11 @@ public class SparkWork extends AbstractO
return new HashSet<BaseWork>(leaves);
}
- public void setRequiredCounterPrefix(List<String> requiredCounterPrefix) {
+ public void setRequiredCounterPrefix(Map<String, List<String>> requiredCounterPrefix) {
this.requiredCounterPrefix = requiredCounterPrefix;
}
- public List<String> getRequiredCounterPrefix() {
+ public Map<String, List<String>> getRequiredCounterPrefix() {
return requiredCounterPrefix;
}