You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/18 19:24:46 UTC

svn commit: r1646506 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/ exec/spark/ exec/spark/status/ log/ parse/spark/

Author: xuefu
Date: Thu Dec 18 18:24:45 2014
New Revision: 1646506

URL: http://svn.apache.org/r1646506
Log:
HIVE-9136: Profile query compiler [Spark Branch] (Chao via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Thu Dec 18 18:24:45 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinPersistableTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
@@ -44,6 +45,8 @@ import org.apache.hadoop.hive.serde2.obj
 public class SparkHashTableSinkOperator
     extends TerminalOperator<SparkHashTableSinkDesc> implements Serializable {
   private static final long serialVersionUID = 1L;
+  private final String CLASS_NAME = this.getClass().getName();
+  private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
   protected static final Log LOG = LogFactory.getLog(SparkHashTableSinkOperator.class.getName());
 
   private HashTableSinkOperator htsOperator;
@@ -90,6 +93,7 @@ public class SparkHashTableSinkOperator
 
   protected void flushToFile(MapJoinPersistableTableContainer tableContainer,
       byte tag) throws IOException, HiveException {
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName());
     MapredLocalWork localWork = getExecContext().getLocalWork();
     BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext();
     Path inputPath = getExecContext().getCurrentInputPath();
@@ -151,6 +155,7 @@ public class SparkHashTableSinkOperator
     FileStatus status = fs.getFileStatus(path);
     htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path +
       " (" + status.getLen() + " bytes)");
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName());
   }
 
   public void setTag(byte tag) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Thu Dec 18 18:24:45 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -56,7 +57,6 @@ import java.util.List;
  *
  */
 public class SparkMapRecordHandler extends SparkRecordHandler {
-
   private static final String PLAN_KEY = "__MAP_PLAN__";
   private MapOperator mo;
   public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class);
@@ -67,6 +67,7 @@ public class SparkMapRecordHandler exten
   private ExecMapperContext execContext;
 
   public void init(JobConf job, OutputCollector output, Reporter reporter) {
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
     super.init(job, output, reporter);
 
     isLogInfoEnabled = l4j.isInfoEnabled();
@@ -136,6 +137,7 @@ public class SparkMapRecordHandler exten
         throw new RuntimeException("Map operator initialization failed", e);
       }
     }
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Thu Dec 18 18:24:45 2014
@@ -27,12 +27,16 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
 import com.google.common.base.Preconditions;
 
 public class SparkPlan {
+  private final String CLASS_NAME = SparkPlan.class.getName();
+  private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+
   private final Set<SparkTran> rootTrans = new HashSet<SparkTran>();
   private final Set<SparkTran> leafTrans = new HashSet<SparkTran>();
   private final Map<SparkTran, List<SparkTran>> transGraph = new HashMap<SparkTran, List<SparkTran>>();
@@ -40,6 +44,7 @@ public class SparkPlan {
   private final Set<Integer> cachedRDDIds = new HashSet<Integer>();
 
   public JavaPairRDD<HiveKey, BytesWritable> generateGraph() throws IllegalStateException {
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH);
     Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> tranToOutputRDDMap
         = new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
     for (SparkTran tran : getAllTrans()) {
@@ -75,6 +80,7 @@ public class SparkPlan {
       }
     }
 
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH);
     return finalRDD;
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Thu Dec 18 18:24:45 2014
@@ -22,9 +22,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -32,9 +39,6 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -45,15 +49,14 @@ import org.apache.hadoop.hive.ql.stats.S
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
-import com.google.common.base.Preconditions;
 
 public class SparkPlanGenerator {
+  private final String CLASS_NAME = SparkPlanGenerator.class.getName();
+  private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
   private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class);
 
   private JavaSparkContext sc;
@@ -82,25 +85,29 @@ public class SparkPlanGenerator {
   }
 
   public SparkPlan generate(SparkWork sparkWork) throws Exception {
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN);
     SparkPlan sparkPlan = new SparkPlan();
     cloneToWork = sparkWork.getCloneToWork();
     workToTranMap.clear();
     workToParentWorkTranMap.clear();
 
     for (BaseWork work : sparkWork.getAllWork()) {
-      SparkTran tran;
+      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName());
+      SparkTran tran = generate(work);
       SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work);
-      tran = generate(work);
       sparkPlan.addTran(tran);
       sparkPlan.connect(parentTran, tran);
       workToTranMap.put(work, tran);
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName());
     }
 
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN);
     return sparkPlan;
   }
 
   // Generate (possibly get from a cached result) parent SparkTran
-  private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, BaseWork work) throws Exception {
+  private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork,
+                                       BaseWork work) throws Exception {
     if (cloneToWork.containsKey(work)) {
       BaseWork originalWork = cloneToWork.get(work);
       if (workToParentWorkTranMap.containsKey(originalWork)) {
@@ -208,15 +215,17 @@ public class SparkPlanGenerator {
     // Make sure we'll use a different plan path from the original one
     HiveConf.setVar(cloned, HiveConf.ConfVars.PLAN, "");
     try {
-      cloned.setPartitionerClass((Class<? extends Partitioner>) (Class.forName(HiveConf.getVar(cloned,
-        HiveConf.ConfVars.HIVEPARTITIONER))));
+      cloned.setPartitionerClass((Class<? extends Partitioner>)
+          (Class.forName(HiveConf.getVar(cloned, HiveConf.ConfVars.HIVEPARTITIONER))));
     } catch (ClassNotFoundException e) {
-      String msg = "Could not find partitioner class: " + e.getMessage() + " which is specified by: " +
+      String msg = "Could not find partitioner class: " + e.getMessage() +
+          " which is specified by: " +
         HiveConf.ConfVars.HIVEPARTITIONER.varname;
       throw new IllegalArgumentException(msg, e);
     }
     if (work instanceof MapWork) {
-      List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, scratchDir, context, false);
+      List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work,
+          scratchDir, context, false);
       Utilities.setInputPaths(cloned, inputPaths);
       Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false);
       Utilities.createTmpDirs(cloned, (MapWork) work);
@@ -224,7 +233,8 @@ public class SparkPlanGenerator {
         MergeFileWork mergeFileWork = (MergeFileWork) work;
         cloned.set(Utilities.MAPRED_MAPPER_CLASS, MergeFileMapper.class.getName());
         cloned.set("mapred.input.format.class", mergeFileWork.getInputformat());
-        cloned.setClass("mapred.output.format.class", MergeFileOutputFormat.class, FileOutputFormat.class);
+        cloned.setClass("mapred.output.format.class", MergeFileOutputFormat.class,
+            FileOutputFormat.class);
       } else {
         cloned.set(Utilities.MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java Thu Dec 18 18:24:45 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.s
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
@@ -33,6 +34,8 @@ import java.util.Arrays;
 import java.util.Iterator;
 
 public abstract class SparkRecordHandler {
+  protected final String CLASS_NAME = this.getClass().getName();
+  protected final PerfLogger perfLogger = PerfLogger.getPerfLogger();
   private final Log LOG = LogFactory.getLog(this.getClass());
 
   // used to log memory usage periodically

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java Thu Dec 18 18:24:45 2014
@@ -34,11 +34,11 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -106,6 +106,7 @@ public class SparkReduceRecordHandler ex
   private MapredLocalWork localWork = null;
 
   public void init(JobConf job, OutputCollector output, Reporter reporter) {
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
     super.init(job, output, reporter);
 
     rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
@@ -230,7 +231,7 @@ public class SparkReduceRecordHandler ex
         throw new RuntimeException("Reduce operator initialization failed", e);
       }
     }
-
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Thu Dec 18 18:24:45 2014
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hive.spark.counter.SparkCounters;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
@@ -75,6 +76,8 @@ import org.apache.hadoop.util.StringUtil
 import com.google.common.collect.Lists;
 
 public class SparkTask extends Task<SparkWork> {
+  private final String CLASS_NAME = SparkTask.class.getName();
+  private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
   private static final long serialVersionUID = 1L;
   private transient JobConf job;
   private transient ContentSummary inputSummary;
@@ -100,7 +103,10 @@ public class SparkTask extends Task<Spar
       SparkWork sparkWork = getWork();
       sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
 
+      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
       SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
+
       SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
       if (sparkJobStatus != null) {
         SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
@@ -198,6 +204,7 @@ public class SparkTask extends Task<Spar
       if (w instanceof MapWork) {
         List<BaseWork> parents = work.getParents(w);
         boolean candidate = true;
+        // TODO: since we don't have UnionWork anymore, can we simplify this?
         for (BaseWork parent: parents) {
           if (!(parent instanceof UnionWork)) {
             candidate = false;
@@ -290,7 +297,7 @@ public class SparkTask extends Task<Spar
       } else {
         tableName = work.getLoadFileDesc().getDestinationCreateTable();
       }
-    Table table = null;
+    Table table;
     try {
       table = db.getTable(tableName);
     } catch (HiveException e) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Thu Dec 18 18:24:45 2014
@@ -28,6 +28,7 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.spark.JobExecutionStatus;
 
@@ -41,6 +42,7 @@ public class SparkJobMonitor {
   private static final Log LOG = LogFactory.getLog(CLASS_NAME);
 
   private transient LogHelper console;
+  private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
   private final int checkInterval = 1000;
   private final int printInterval = 3000;
   private long lastPrintTime;
@@ -63,6 +65,9 @@ public class SparkJobMonitor {
     Map<String, SparkStageProgress> lastProgressMap = null;
     long startTime = -1;
 
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+
     while (true) {
       JobExecutionStatus state = sparkJobStatus.getState();
       try {
@@ -77,6 +82,7 @@ public class SparkJobMonitor {
           switch (state) {
           case RUNNING:
             if (!running) {
+              perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
               // print job stages.
               console.printInfo("\nQuery Hive on Spark job[" +
                 sparkJobStatus.getJobId() + "] stages:");
@@ -140,10 +146,13 @@ public class SparkJobMonitor {
         }
       }
     }
+
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
     return rc;
   }
 
-  private void printStatus(Map<String, SparkStageProgress> progressMap, Map<String, SparkStageProgress> lastProgressMap) {
+  private void printStatus(Map<String, SparkStageProgress> progressMap,
+                           Map<String, SparkStageProgress> lastProgressMap) {
 
     // do not print duplicate status while still in middle of print interval.
     boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap);
@@ -170,9 +179,17 @@ public class SparkJobMonitor {
       } else {
         if (complete == total && !completed.contains(s)) {
           completed.add(s);
+
+          if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
+            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE);
+          }
+          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE);
         }
         if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
           /* stage is started, but not complete */
+          if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
+            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
+          }
           if (failed > 0) {
             reportBuffer.append(
               String.format(

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java Thu Dec 18 18:24:45 2014
@@ -65,6 +65,17 @@ public class PerfLogger {
   public static final String LOAD_HASHTABLE = "LoadHashtable";
   public static final String ORC_GET_SPLITS = "OrcGetSplits";
 
+  public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning";
+  public static final String SPARK_BUILD_PLAN = "SparkBuildPlan";
+  public static final String SPARK_BUILD_RDD_GRAPH = "SparkBuildRDDGraph";
+  public static final String SPARK_SUBMIT_JOB = "SparkSubmitJob";
+  public static final String SPARK_RUN_JOB = "SparkRunJob";
+  public static final String SPARK_CREATE_TRAN = "SparkCreateTran.";
+  public static final String SPARK_RUN_STAGE = "SparkRunStage.";
+  public static final String SPARK_INIT_OPERATORS = "SparkInitializeOperators";
+  public static final String SPARK_GENERATE_TASK_TREE = "SparkGenerateTaskTree";
+  public static final String SPARK_FLUSH_HASHTABLE = "SparkFlushHashTable.";
+
   protected static final ThreadLocal<PerfLogger> perfLogger = new ThreadLocal<PerfLogger>();
 
   protected final Map<String, Long> startTimes = new HashMap<String, Long>();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1646506&r1=1646505&r2=1646506&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Thu Dec 18 18:24:45 2014
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.lib.TypeRule;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
@@ -89,6 +90,8 @@ import org.apache.hadoop.hive.ql.session
  * TODO: need to complete and make it fit to Spark.
  */
 public class SparkCompiler extends TaskCompiler {
+  private static final String CLASS_NAME = SparkCompiler.class.getName();
+  private static final PerfLogger perfLogger = PerfLogger.getPerfLogger();
   private static final Log logger = LogFactory.getLog(SparkCompiler.class);
 
   public SparkCompiler() {
@@ -141,6 +144,7 @@ public class SparkCompiler extends TaskC
   protected void generateTaskTree(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
       List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs)
       throws SemanticException {
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
     GenSparkUtils.getUtils().resetSequenceNumber();
 
     ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
@@ -212,6 +216,8 @@ public class SparkCompiler extends TaskC
     for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
       GenSparkUtils.getUtils().processFileSink(procCtx, fileSink);
     }
+
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
   }
 
   @Override