You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2015/02/05 16:59:25 UTC

svn commit: r1657614 - in /pig/branches/spark/src/org/apache/pig: backend/hadoop/executionengine/spark/ tools/pigstats/ tools/pigstats/spark/

Author: praveen
Date: Thu Feb  5 15:59:24 2015
New Revision: 1657614

URL: http://svn.apache.org/r1657614
Log:
PIG-4393: Add stats and error reporting for Spark (Mohit via Praveen)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Removed:
    pig/branches/spark/src/org/apache/pig/tools/pigstats/SparkStats.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java?rev=1657614&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java Thu Feb  5 15:59:24 2015
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class JobMetricsListener implements SparkListener {
+
+  private static final Log LOG = LogFactory.getLog(JobMetricsListener.class);
+
+  private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
+  private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
+  private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
+  private final Set<Integer> finishedJobIds = Sets.newHashSet();
+
+  @Override
+  public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+
+  }
+
+  @Override
+  public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+
+  }
+
+  @Override
+  public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+  }
+
+  @Override
+  public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
+
+  }
+
+  @Override
+  public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+      int stageId = taskEnd.stageId();
+      int stageAttemptId = taskEnd.stageAttemptId();
+      String stageIdentifier = stageId + "_" + stageAttemptId;
+      Integer jobId = stageIdToJobId.get(stageId);
+      if (jobId == null) {
+          LOG.warn("Cannot find job id for stage[" + stageId + "].");
+      } else {
+          Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId);
+          if (jobMetrics == null) {
+              jobMetrics = Maps.newHashMap();
+              allJobMetrics.put(jobId, jobMetrics);
+          }
+          List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+          if (stageMetrics == null) {
+              stageMetrics = Lists.newLinkedList();
+              jobMetrics.put(stageIdentifier, stageMetrics);
+          }
+          stageMetrics.add(taskEnd.taskMetrics());
+      }
+  }
+
+  @Override
+  public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+      int jobId = jobStart.jobId();
+      int size = jobStart.stageIds().size();
+      int[] intStageIds = new int[size];
+      for (int i = 0; i < size; i++) {
+          Integer stageId = (Integer) jobStart.stageIds().apply(i);
+          intStageIds[i] = stageId;
+          stageIdToJobId.put(stageId, jobId);
+      }
+      jobIdToStageId.put(jobId, intStageIds);
+  }
+
+  @Override
+  public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+      finishedJobIds.add(jobEnd.jobId());
+      notify();
+  }
+
+  @Override
+  public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
+
+  }
+
+  @Override
+  public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
+
+  }
+
+  @Override
+  public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
+
+  }
+
+  @Override
+  public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+  }
+
+  @Override
+  public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
+
+  }
+
+  @Override
+  public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+
+  }
+
+  @Override
+  public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
+
+  }
+
+  public synchronized  Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
+      return allJobMetrics.get(jobId);
+  }
+
+  public synchronized boolean waitForJobToEnd(int jobId) throws InterruptedException {
+      if (finishedJobIds.contains(jobId)) {
+        finishedJobIds.remove(jobId);
+        return true;
+      }
+
+      wait();
+      return false;
+  }
+
+  public synchronized void cleanup(int jobId) {
+      allJobMetrics.remove(jobId);
+      jobIdToStageId.remove(jobId);
+      Iterator<Map.Entry<Integer, Integer>> iterator = stageIdToJobId.entrySet().iterator();
+      while (iterator.hasNext()) {
+          Map.Entry<Integer, Integer> entry = iterator.next();
+          if (entry.getValue() == jobId) {
+              iterator.remove();
+          }
+      }
+  }
+
+  public synchronized void reset() {
+      stageIdToJobId.clear();
+      jobIdToStageId.clear();
+      allJobMetrics.clear();
+      finishedJobIds.clear();
+  }
+}
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java?rev=1657614&r1=1657613&r2=1657614&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java Thu Feb  5 15:59:24 2015
@@ -7,7 +7,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
-import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
+import org.apache.pig.tools.pigstats.spark.SparkPigStats;
 
 public class SparkExecutionEngine extends HExecutionEngine {
 
@@ -25,6 +25,6 @@ public class SparkExecutionEngine extend
 
     @Override
     public PigStats instantiatePigStats() {
-        return new SimplePigStats();
+        return new SparkPigStats();
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1657614&r1=1657613&r2=1657614&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Thu Feb  5 15:59:24 2015
@@ -5,15 +5,21 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.google.common.collect.Lists;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -72,11 +78,11 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.SparkStats;
+import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.scheduler.JobLogger;
 import org.apache.spark.scheduler.StatsReportListener;
-import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
 
 /**
@@ -89,7 +95,8 @@ public class SparkLauncher extends Launc
     // Our connection to Spark. It needs to be static so that it can be reused
     // across jobs, because a
     // new SparkLauncher gets created for each job.
-    private static SparkContext sparkContext = null;
+    private static JavaSparkContext sparkContext = null;
+    private static JobMetricsListener jobMetricsListener = new JobMetricsListener();
 
     public static BroadCastServer bcaster;
     private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
@@ -99,6 +106,7 @@ public class SparkLauncher extends Launc
     // it to be shared across SparkLaunchers. It gets cleared whenever we close
     // the SparkContext.
     // private static CacheConverter cacheConverter = null;
+    private String jobGroupID;
 
     @Override
     public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
@@ -128,9 +136,19 @@ public class SparkLauncher extends Launc
             }
         }
 
+        SparkPigStats sparkStats = (SparkPigStats)
+            pigContext.getExecutionEngine().instantiatePigStats();
+        PigStats.start(sparkStats);
+
         startSparkIfNeeded();
+        // Set a unique group id for this query, so we can lookup all Spark job ids
+        // related to this query.
+        jobGroupID = UUID.randomUUID().toString();
+        sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster", false);
+        jobMetricsListener.reset();
+
         String currentDirectoryPath = Paths.get(".").toAbsolutePath().normalize().toString() + "/";
-        startSparkJob(pigContext,currentDirectoryPath);
+        startSparkJob(pigContext, currentDirectoryPath);
         LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
                 physicalPlan, POStore.class);
         POStore firstStore = stores.getFirst();
@@ -144,7 +162,7 @@ public class SparkLauncher extends Launc
         Map<Class<? extends PhysicalOperator>, POConverter> convertMap = new HashMap<Class<? extends PhysicalOperator>, POConverter>();
 
         convertMap.put(POLoad.class, new LoadConverter(pigContext,
-                physicalPlan, sparkContext));
+                physicalPlan, sparkContext.sc()));
         convertMap.put(POStore.class, new StoreConverter(pigContext));
         convertMap.put(POForEach.class, new ForEachConverter(confBytes));
         convertMap.put(POFilter.class, new FilterConverter());
@@ -155,7 +173,7 @@ public class SparkLauncher extends Launc
         convertMap.put(POGlobalRearrange.class, new GlobalRearrangeConverter());
         convertMap.put(POLimit.class, new LimitConverter());
         convertMap.put(PODistinct.class, new DistinctConverter());
-        convertMap.put(POUnion.class, new UnionConverter(sparkContext));
+        convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
         convertMap.put(POSort.class, new SortConverter());
         convertMap.put(POSplit.class, new SplitConverter());
         convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
@@ -166,15 +184,50 @@ public class SparkLauncher extends Launc
 
         Map<OperatorKey, RDD<Tuple>> rdds = new HashMap<OperatorKey, RDD<Tuple>>();
 
-        SparkStats stats = new SparkStats();
-
+        Set<Integer> seenJobIDs = new HashSet<Integer>();
         for (POStore poStore : stores) {
             physicalToRDD(physicalPlan, poStore, rdds, convertMap);
-            stats.addOutputInfo(poStore, 1, 1, true, c); // TODO: use real
-            // values
+            for (int jobID : getJobIDs(seenJobIDs)) {
+                SparkStatsUtil.waitForJobAddStats(jobID, poStore,
+                    jobMetricsListener, sparkContext, sparkStats, c);
+            }
         }
+
         cleanUpSparkJob(pigContext,currentDirectoryPath);
-        return stats;
+        sparkStats.finish();
+        return sparkStats;
+    }
+
+
+    /**
+     * In Spark, currently only async actions return job id.
+     * There is no async equivalent of actions like saveAsNewAPIHadoopFile()
+     *
+     * The only other way to get a job id is to register a "job group ID" with the
+     * spark context and request all job ids corresponding to that job group via
+     * getJobIdsForGroup.
+     *
+     * However getJobIdsForGroup does not guarantee the order of the elements in
+     * it's result.
+     *
+     * This method simply returns the previously unseen job ids.
+     *
+     * @param seenJobIDs job ids in the job group that are already seen
+     * @return Spark job ids not seen before
+     */
+    private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
+        Set<Integer> groupjobIDs = new HashSet<Integer>(Arrays.asList(
+            ArrayUtils.toObject(sparkContext.statusTracker()
+                .getJobIdsForGroup(jobGroupID))));
+        groupjobIDs.removeAll(seenJobIDs);
+        List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
+        if (unseenJobIDs.size() == 0) {
+          throw new RuntimeException("Expected at least one unseen jobID " +
+              " in this call to getJobIdsForGroup, but got " + unseenJobIDs.size());
+        }
+
+        seenJobIDs.addAll(unseenJobIDs);
+        return unseenJobIDs;
     }
 
     private void cleanUpSparkJob(PigContext pigContext, String currentDirectoryPath) {
@@ -341,11 +394,11 @@ public class SparkLauncher extends Launc
 //            System.setProperty("spark.shuffle.memoryFraction", "0.0");
 //            System.setProperty("spark.storage.memoryFraction", "0.0");
 
-            JavaSparkContext javaContext = new JavaSparkContext(master,
+            sparkContext = new JavaSparkContext(master,
                     "Spork", sparkHome, jars.toArray(new String[jars.size()]));
-            sparkContext = javaContext.sc();
-            sparkContext.addSparkListener(new StatsReportListener());
-            sparkContext.addSparkListener(new JobLogger());
+            sparkContext.sc().addSparkListener(new StatsReportListener());
+            sparkContext.sc().addSparkListener(new JobLogger());
+            sparkContext.sc().addSparkListener(jobMetricsListener);
             // cacheConverter = new CacheConverter();
         }
     }

Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1657614&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (added)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Thu Feb  5 15:59:24 2015
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.executor.TaskMetrics;
+import scala.Option;
+
+import java.util.List;
+import java.util.Map;
+
+public class SparkJobStats extends JobStats {
+
+  private int jobId;
+  private Map<String, Long> stats = Maps.newLinkedHashMap();
+
+  protected SparkJobStats(int jobId, PigStats.JobGraph plan) {
+      this(String.valueOf(jobId), plan);
+      this.jobId = jobId;
+  }
+
+  protected SparkJobStats(String jobId, PigStats.JobGraph plan) {
+      super(jobId, plan);
+  }
+
+  public void addOutputInfo(POStore poStore,  boolean success,
+                            JobMetricsListener jobMetricsListener,
+                            Configuration conf) {
+      // TODO: Compute #records
+      long bytes = getOutputSize(poStore, conf);
+      OutputStats outputStats = new OutputStats(poStore.getSFile().getFileName(),
+          bytes, 1, success);
+      outputStats.setPOStore(poStore);
+      outputStats.setConf(conf);
+      outputs.add(outputStats);
+  }
+
+  public void collectStats(JobMetricsListener jobMetricsListener) {
+      Map<String, List<TaskMetrics>> taskMetrics = jobMetricsListener.getJobMetric(jobId);
+      if (taskMetrics == null) {
+        throw new RuntimeException("No task metrics available for jobId " + jobId);
+      }
+
+      stats = combineTaskMetrics(taskMetrics);
+  }
+
+  public Map<String, Long> getStats() {
+      return stats;
+  }
+
+  private Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) {
+      Map<String, Long> results = Maps.newLinkedHashMap();
+
+      long executorDeserializeTime = 0;
+      long executorRunTime = 0;
+      long resultSize = 0;
+      long jvmGCTime = 0;
+      long resultSerializationTime = 0;
+      long memoryBytesSpilled = 0;
+      long diskBytesSpilled = 0;
+      long bytesRead = 0;
+      long bytesWritten = 0;
+      long remoteBlocksFetched = 0;
+      long localBlocksFetched = 0;
+      long fetchWaitTime = 0;
+      long remoteBytesRead = 0;
+      long shuffleBytesWritten = 0;
+      long shuffleWriteTime = 0;
+      boolean inputMetricExist = false;
+      boolean outputMetricExist = false;
+      boolean shuffleReadMetricExist = false;
+      boolean shuffleWriteMetricExist = false;
+
+      for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+        if (stageMetric != null) {
+          for (TaskMetrics taskMetrics : stageMetric) {
+            if (taskMetrics != null) {
+              executorDeserializeTime += taskMetrics.executorDeserializeTime();
+              executorRunTime += taskMetrics.executorRunTime();
+              resultSize += taskMetrics.resultSize();
+              jvmGCTime += taskMetrics.jvmGCTime();
+              resultSerializationTime += taskMetrics.resultSerializationTime();
+              memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+              diskBytesSpilled += taskMetrics.diskBytesSpilled();
+              if (!taskMetrics.inputMetrics().isEmpty()) {
+                inputMetricExist = true;
+                bytesRead += taskMetrics.inputMetrics().get().bytesRead();
+              }
+
+              if (!taskMetrics.outputMetrics().isEmpty()) {
+                outputMetricExist = true;
+                bytesWritten += taskMetrics.outputMetrics().get().bytesWritten();
+              }
+
+              Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+              if (!shuffleReadMetricsOption.isEmpty()) {
+                shuffleReadMetricExist = true;
+                remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
+                localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
+                fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
+                remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
+              }
+
+              Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+              if (!shuffleWriteMetricsOption.isEmpty()) {
+                shuffleWriteMetricExist = true;
+                shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
+                shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+              }
+
+            }
+          }
+        }
+      }
+
+      results.put("EexcutorDeserializeTime", executorDeserializeTime);
+      results.put("ExecutorRunTime", executorRunTime);
+      results.put("ResultSize", resultSize);
+      results.put("JvmGCTime", jvmGCTime);
+      results.put("ResultSerializationTime", resultSerializationTime);
+      results.put("MemoryBytesSpilled", memoryBytesSpilled);
+      results.put("DiskBytesSpilled", diskBytesSpilled);
+      if (inputMetricExist) {
+        results.put("BytesRead", bytesRead);
+      }
+
+      if (outputMetricExist) {
+        results.put("BytesWritten", bytesWritten);
+      }
+
+      if (shuffleReadMetricExist) {
+        results.put("RemoteBlocksFetched", remoteBlocksFetched);
+        results.put("LocalBlocksFetched", localBlocksFetched);
+        results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
+        results.put("FetchWaitTime", fetchWaitTime);
+        results.put("RemoteBytesRead", remoteBytesRead);
+      }
+
+      if (shuffleWriteMetricExist) {
+        results.put("ShuffleBytesWritten", shuffleBytesWritten);
+        results.put("ShuffleWriteTime", shuffleWriteTime);
+      }
+
+      return results;
+  }
+
+  @Override
+  public String getJobId() {
+      return String.valueOf(jobId);
+  }
+
+  @Override
+  public void accept(PlanVisitor v) throws FrontendException {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getDisplayString() {
+      return null;
+  }
+
+  @Override
+  public int getNumberMaps() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getNumberReduces() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getMaxMapTime() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getMinMapTime() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getAvgMapTime() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getMaxReduceTime() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getMinReduceTime() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getAvgREduceTime() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getMapInputRecords() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getMapOutputRecords() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getReduceInputRecords() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getReduceOutputRecords() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getSMMSpillCount() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getProactiveSpillCountObjects() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getProactiveSpillCountRecs() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Counters getHadoopCounters() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<String, Long> getMultiStoreCounters() {
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<String, Long> getMultiInputCounters() {
+      throw new UnsupportedOperationException();
+  }
+}

Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1657614&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (added)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Thu Feb  5 15:59:24 2015
@@ -0,0 +1,130 @@
+package org.apache.pig.tools.pigstats.spark;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class SparkPigStats extends PigStats {
+
+    private static final Log LOG = LogFactory.getLog(SparkPigStats.class);
+
+    public SparkPigStats() {
+      jobPlan = new JobGraph();
+    }
+
+    public void addJobStats(POStore poStore, int jobId,
+                            JobMetricsListener jobMetricsListener,
+                            JavaSparkContext sparkContext,
+                            Configuration conf) {
+        boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext);
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+        jobStats.setSuccessful(isSuccess);
+        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+        jobStats.collectStats(jobMetricsListener);
+        jobPlan.add(jobStats);
+    }
+
+    public void finish() {
+        super.stop();
+        display();
+    }
+
+    private void display() {
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            SparkJobStats js = (SparkJobStats)iter.next();
+            LOG.info( "Spark Job [" + js.getJobId() + "] Metrics");
+            Map<String, Long> stats = js.getStats();
+            if (stats == null) {
+                LOG.info("No statistics found for job " + js.getJobId());
+                return;
+            }
+
+            Iterator statIt = stats.entrySet().iterator();
+            while (statIt.hasNext()) {
+                Map.Entry pairs = (Map.Entry)statIt.next();
+                LOG.info("\t" + pairs.getKey() + " : " + pairs.getValue());
+            }
+        }
+    }
+
+    @Override
+    public JobClient getJobClient() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isEmbedded() {
+        return false;
+    }
+
+    @Override
+    public Map<String, List<PigStats>> getAllStats() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<String> getAllErrorMessages() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Properties getPigProperties() {
+        return null;
+    }
+
+    @Override
+    public String getOutputAlias(String location) {
+        // TODO
+        return null;
+    }
+
+    @Override
+    public long getSMMSpillCount() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getProactiveSpillCountObjects() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getProactiveSpillCountRecords() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getBytesWritten() {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public long getRecordWritten() {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public int getNumberJobs() {
+        return jobPlan.size();
+    }
+
+    @Override
+    public OutputStats result(String alias) {
+        return null;
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1657614&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (added)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Thu Feb  5 15:59:24 2015
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class SparkStatsUtil {
+
+  public static void waitForJobAddStats(int jobID,
+                                        POStore poStore,
+                                        JobMetricsListener jobMetricsListener,
+                                        JavaSparkContext sparkContext,
+                                        SparkPigStats sparkPigStats,
+                                        JobConf jobConf)
+      throws InterruptedException {
+      // Even though we are not making any async calls to spark,
+      // the SparkStatusTracker can still return RUNNING status
+      // for a finished job.
+      // Looks like there is a race condition between spark
+      // "event bus" thread updating it's internal listener and
+      // this driver thread calling SparkStatusTracker.
+      // To workaround this, we will wait for this job to "finish".
+      jobMetricsListener.waitForJobToEnd(jobID);
+      sparkPigStats.addJobStats(poStore, jobID, jobMetricsListener,
+          sparkContext, jobConf);
+      jobMetricsListener.cleanup(jobID);
+  }
+
+  public static boolean isJobSuccess(int jobID,
+                                    JavaSparkContext sparkContext) {
+      JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();
+      if (status == JobExecutionStatus.SUCCEEDED) {
+        return true;
+      } else if (status != JobExecutionStatus.FAILED) {
+        throw new RuntimeException("Unexpected job execution status " +
+            status);
+      }
+
+      return false;
+  }
+
+  private static SparkJobInfo getJobInfo(int jobID,
+                                         JavaSparkContext sparkContext) {
+      SparkJobInfo jobInfo = sparkContext.statusTracker().getJobInfo(jobID);
+      if (jobInfo == null) {
+        throw new RuntimeException("No jobInfo available for jobID "
+            + jobID);
+      }
+
+      return jobInfo;
+  }
+}
\ No newline at end of file