You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/18 19:24:46 UTC
svn commit: r1646506 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/
exec/spark/ exec/spark/status/ log/ parse/spark/
Author: xuefu
Date: Thu Dec 18 18:24:45 2014
New Revision: 1646506
URL: http://svn.apache.org/r1646506
Log:
HIVE-9136: Profile query compiler [Spark Branch] (Chao via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Thu Dec 18 18:24:45 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinPersistableTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
@@ -44,6 +45,8 @@ import org.apache.hadoop.hive.serde2.obj
public class SparkHashTableSinkOperator
extends TerminalOperator<SparkHashTableSinkDesc> implements Serializable {
private static final long serialVersionUID = 1L;
+ private final String CLASS_NAME = this.getClass().getName();
+ private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
protected static final Log LOG = LogFactory.getLog(SparkHashTableSinkOperator.class.getName());
private HashTableSinkOperator htsOperator;
@@ -90,6 +93,7 @@ public class SparkHashTableSinkOperator
protected void flushToFile(MapJoinPersistableTableContainer tableContainer,
byte tag) throws IOException, HiveException {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName());
MapredLocalWork localWork = getExecContext().getLocalWork();
BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext();
Path inputPath = getExecContext().getCurrentInputPath();
@@ -151,6 +155,7 @@ public class SparkHashTableSinkOperator
FileStatus status = fs.getFileStatus(path);
htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path +
" (" + status.getLen() + " bytes)");
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName());
}
public void setTag(byte tag) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Thu Dec 18 18:24:45 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -56,7 +57,6 @@ import java.util.List;
*
*/
public class SparkMapRecordHandler extends SparkRecordHandler {
-
private static final String PLAN_KEY = "__MAP_PLAN__";
private MapOperator mo;
public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class);
@@ -67,6 +67,7 @@ public class SparkMapRecordHandler exten
private ExecMapperContext execContext;
public void init(JobConf job, OutputCollector output, Reporter reporter) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
super.init(job, output, reporter);
isLogInfoEnabled = l4j.isInfoEnabled();
@@ -136,6 +137,7 @@ public class SparkMapRecordHandler exten
throw new RuntimeException("Map operator initialization failed", e);
}
}
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Thu Dec 18 18:24:45 2014
@@ -27,12 +27,16 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
import com.google.common.base.Preconditions;
public class SparkPlan {
+ private final String CLASS_NAME = SparkPlan.class.getName();
+ private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+
private final Set<SparkTran> rootTrans = new HashSet<SparkTran>();
private final Set<SparkTran> leafTrans = new HashSet<SparkTran>();
private final Map<SparkTran, List<SparkTran>> transGraph = new HashMap<SparkTran, List<SparkTran>>();
@@ -40,6 +44,7 @@ public class SparkPlan {
private final Set<Integer> cachedRDDIds = new HashSet<Integer>();
public JavaPairRDD<HiveKey, BytesWritable> generateGraph() throws IllegalStateException {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH);
Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> tranToOutputRDDMap
= new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
for (SparkTran tran : getAllTrans()) {
@@ -75,6 +80,7 @@ public class SparkPlan {
}
}
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH);
return finalRDD;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Thu Dec 18 18:24:45 2014
@@ -22,9 +22,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -32,9 +39,6 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -45,15 +49,14 @@ import org.apache.hadoop.hive.ql.stats.S
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import com.google.common.base.Preconditions;
public class SparkPlanGenerator {
+ private final String CLASS_NAME = SparkPlanGenerator.class.getName();
+ private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class);
private JavaSparkContext sc;
@@ -82,25 +85,29 @@ public class SparkPlanGenerator {
}
public SparkPlan generate(SparkWork sparkWork) throws Exception {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN);
SparkPlan sparkPlan = new SparkPlan();
cloneToWork = sparkWork.getCloneToWork();
workToTranMap.clear();
workToParentWorkTranMap.clear();
for (BaseWork work : sparkWork.getAllWork()) {
- SparkTran tran;
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName());
+ SparkTran tran = generate(work);
SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work);
- tran = generate(work);
sparkPlan.addTran(tran);
sparkPlan.connect(parentTran, tran);
workToTranMap.put(work, tran);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName());
}
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN);
return sparkPlan;
}
// Generate (possibly get from a cached result) parent SparkTran
- private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, BaseWork work) throws Exception {
+ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork,
+ BaseWork work) throws Exception {
if (cloneToWork.containsKey(work)) {
BaseWork originalWork = cloneToWork.get(work);
if (workToParentWorkTranMap.containsKey(originalWork)) {
@@ -208,15 +215,17 @@ public class SparkPlanGenerator {
// Make sure we'll use a different plan path from the original one
HiveConf.setVar(cloned, HiveConf.ConfVars.PLAN, "");
try {
- cloned.setPartitionerClass((Class<? extends Partitioner>) (Class.forName(HiveConf.getVar(cloned,
- HiveConf.ConfVars.HIVEPARTITIONER))));
+ cloned.setPartitionerClass((Class<? extends Partitioner>)
+ (Class.forName(HiveConf.getVar(cloned, HiveConf.ConfVars.HIVEPARTITIONER))));
} catch (ClassNotFoundException e) {
- String msg = "Could not find partitioner class: " + e.getMessage() + " which is specified by: " +
+ String msg = "Could not find partitioner class: " + e.getMessage() +
+ " which is specified by: " +
HiveConf.ConfVars.HIVEPARTITIONER.varname;
throw new IllegalArgumentException(msg, e);
}
if (work instanceof MapWork) {
- List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, scratchDir, context, false);
+ List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work,
+ scratchDir, context, false);
Utilities.setInputPaths(cloned, inputPaths);
Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false);
Utilities.createTmpDirs(cloned, (MapWork) work);
@@ -224,7 +233,8 @@ public class SparkPlanGenerator {
MergeFileWork mergeFileWork = (MergeFileWork) work;
cloned.set(Utilities.MAPRED_MAPPER_CLASS, MergeFileMapper.class.getName());
cloned.set("mapred.input.format.class", mergeFileWork.getInputformat());
- cloned.setClass("mapred.output.format.class", MergeFileOutputFormat.class, FileOutputFormat.class);
+ cloned.setClass("mapred.output.format.class", MergeFileOutputFormat.class,
+ FileOutputFormat.class);
} else {
cloned.set(Utilities.MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java Thu Dec 18 18:24:45 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.s
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
@@ -33,6 +34,8 @@ import java.util.Arrays;
import java.util.Iterator;
public abstract class SparkRecordHandler {
+ protected final String CLASS_NAME = this.getClass().getName();
+ protected final PerfLogger perfLogger = PerfLogger.getPerfLogger();
private final Log LOG = LogFactory.getLog(this.getClass());
// used to log memory usage periodically
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java Thu Dec 18 18:24:45 2014
@@ -34,11 +34,11 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -106,6 +106,7 @@ public class SparkReduceRecordHandler ex
private MapredLocalWork localWork = null;
public void init(JobConf job, OutputCollector output, Reporter reporter) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
super.init(job, output, reporter);
rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
@@ -230,7 +231,7 @@ public class SparkReduceRecordHandler ex
throw new RuntimeException("Reduce operator initialization failed", e);
}
}
-
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Thu Dec 18 18:24:45 2014
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
@@ -75,6 +76,8 @@ import org.apache.hadoop.util.StringUtil
import com.google.common.collect.Lists;
public class SparkTask extends Task<SparkWork> {
+ private final String CLASS_NAME = SparkTask.class.getName();
+ private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
private static final long serialVersionUID = 1L;
private transient JobConf job;
private transient ContentSummary inputSummary;
@@ -100,7 +103,10 @@ public class SparkTask extends Task<Spar
SparkWork sparkWork = getWork();
sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
+
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
if (sparkJobStatus != null) {
SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
@@ -198,6 +204,7 @@ public class SparkTask extends Task<Spar
if (w instanceof MapWork) {
List<BaseWork> parents = work.getParents(w);
boolean candidate = true;
+ // TODO: since we don't have UnionWork anymore, can we simplify this?
for (BaseWork parent: parents) {
if (!(parent instanceof UnionWork)) {
candidate = false;
@@ -290,7 +297,7 @@ public class SparkTask extends Task<Spar
} else {
tableName = work.getLoadFileDesc().getDestinationCreateTable();
}
- Table table = null;
+ Table table;
try {
table = db.getTable(tableName);
} catch (HiveException e) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Thu Dec 18 18:24:45 2014
@@ -28,6 +28,7 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.spark.JobExecutionStatus;
@@ -41,6 +42,7 @@ public class SparkJobMonitor {
private static final Log LOG = LogFactory.getLog(CLASS_NAME);
private transient LogHelper console;
+ private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
private final int checkInterval = 1000;
private final int printInterval = 3000;
private long lastPrintTime;
@@ -63,6 +65,9 @@ public class SparkJobMonitor {
Map<String, SparkStageProgress> lastProgressMap = null;
long startTime = -1;
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+
while (true) {
JobExecutionStatus state = sparkJobStatus.getState();
try {
@@ -77,6 +82,7 @@ public class SparkJobMonitor {
switch (state) {
case RUNNING:
if (!running) {
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
// print job stages.
console.printInfo("\nQuery Hive on Spark job[" +
sparkJobStatus.getJobId() + "] stages:");
@@ -140,10 +146,13 @@ public class SparkJobMonitor {
}
}
}
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
return rc;
}
- private void printStatus(Map<String, SparkStageProgress> progressMap, Map<String, SparkStageProgress> lastProgressMap) {
+ private void printStatus(Map<String, SparkStageProgress> progressMap,
+ Map<String, SparkStageProgress> lastProgressMap) {
// do not print duplicate status while still in middle of print interval.
boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap);
@@ -170,9 +179,17 @@ public class SparkJobMonitor {
} else {
if (complete == total && !completed.contains(s)) {
completed.add(s);
+
+ if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE);
+ }
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE);
}
if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
/* stage is started, but not complete */
+ if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
+ }
if (failed > 0) {
reportBuffer.append(
String.format(
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java Thu Dec 18 18:24:45 2014
@@ -65,6 +65,17 @@ public class PerfLogger {
public static final String LOAD_HASHTABLE = "LoadHashtable";
public static final String ORC_GET_SPLITS = "OrcGetSplits";
+ public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning";
+ public static final String SPARK_BUILD_PLAN = "SparkBuildPlan";
+ public static final String SPARK_BUILD_RDD_GRAPH = "SparkBuildRDDGraph";
+ public static final String SPARK_SUBMIT_JOB = "SparkSubmitJob";
+ public static final String SPARK_RUN_JOB = "SparkRunJob";
+ public static final String SPARK_CREATE_TRAN = "SparkCreateTran.";
+ public static final String SPARK_RUN_STAGE = "SparkRunStage.";
+ public static final String SPARK_INIT_OPERATORS = "SparkInitializeOperators";
+ public static final String SPARK_GENERATE_TASK_TREE = "SparkGenerateTaskTree";
+ public static final String SPARK_FLUSH_HASHTABLE = "SparkFlushHashTable.";
+
protected static final ThreadLocal<PerfLogger> perfLogger = new ThreadLocal<PerfLogger>();
protected final Map<String, Long> startTimes = new HashMap<String, Long>();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Thu Dec 18 18:24:45 2014
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.lib.TypeRule;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
@@ -89,6 +90,8 @@ import org.apache.hadoop.hive.ql.session
* TODO: need to complete and make it fit to Spark.
*/
public class SparkCompiler extends TaskCompiler {
+ private static final String CLASS_NAME = SparkCompiler.class.getName();
+ private static final PerfLogger perfLogger = PerfLogger.getPerfLogger();
private static final Log logger = LogFactory.getLog(SparkCompiler.class);
public SparkCompiler() {
@@ -141,6 +144,7 @@ public class SparkCompiler extends TaskC
protected void generateTaskTree(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs)
throws SemanticException {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
GenSparkUtils.getUtils().resetSequenceNumber();
ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
@@ -212,6 +216,8 @@ public class SparkCompiler extends TaskC
for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
GenSparkUtils.getUtils().processFileSink(procCtx, fileSink);
}
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
}
@Override