You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2015/02/05 16:59:25 UTC
svn commit: r1657614 - in /pig/branches/spark/src/org/apache/pig:
backend/hadoop/executionengine/spark/ tools/pigstats/ tools/pigstats/spark/
Author: praveen
Date: Thu Feb 5 15:59:24 2015
New Revision: 1657614
URL: http://svn.apache.org/r1657614
Log:
PIG-4393: Add stats and error reporting for Spark (Mohit via Praveen)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Removed:
pig/branches/spark/src/org/apache/pig/tools/pigstats/SparkStats.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java?rev=1657614&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java Thu Feb 5 15:59:24 2015
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class JobMetricsListener implements SparkListener {
+
+ private static final Log LOG = LogFactory.getLog(JobMetricsListener.class);
+
+ private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
+ private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
+ private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
+ private final Set<Integer> finishedJobIds = Sets.newHashSet();
+
+ @Override
+ public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+
+ }
+
+ @Override
+ public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+
+ }
+
+ @Override
+ public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+ }
+
+ @Override
+ public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
+
+ }
+
+ @Override
+ public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+ int stageId = taskEnd.stageId();
+ int stageAttemptId = taskEnd.stageAttemptId();
+ String stageIdentifier = stageId + "_" + stageAttemptId;
+ Integer jobId = stageIdToJobId.get(stageId);
+ if (jobId == null) {
+ LOG.warn("Cannot find job id for stage[" + stageId + "].");
+ } else {
+ Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId);
+ if (jobMetrics == null) {
+ jobMetrics = Maps.newHashMap();
+ allJobMetrics.put(jobId, jobMetrics);
+ }
+ List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+ if (stageMetrics == null) {
+ stageMetrics = Lists.newLinkedList();
+ jobMetrics.put(stageIdentifier, stageMetrics);
+ }
+ stageMetrics.add(taskEnd.taskMetrics());
+ }
+ }
+
+ @Override
+ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+ int jobId = jobStart.jobId();
+ int size = jobStart.stageIds().size();
+ int[] intStageIds = new int[size];
+ for (int i = 0; i < size; i++) {
+ Integer stageId = (Integer) jobStart.stageIds().apply(i);
+ intStageIds[i] = stageId;
+ stageIdToJobId.put(stageId, jobId);
+ }
+ jobIdToStageId.put(jobId, intStageIds);
+ }
+
+ @Override
+ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+ finishedJobIds.add(jobEnd.jobId());
+ notify();
+ }
+
+ @Override
+ public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
+
+ }
+
+ @Override
+ public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
+
+ }
+
+ @Override
+ public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
+
+ }
+
+ @Override
+ public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+ }
+
+ @Override
+ public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
+
+ }
+
+ @Override
+ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+
+ }
+
+ @Override
+ public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
+
+ }
+
+ public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
+ return allJobMetrics.get(jobId);
+ }
+
+ public synchronized boolean waitForJobToEnd(int jobId) throws InterruptedException {
+ if (finishedJobIds.contains(jobId)) {
+ finishedJobIds.remove(jobId);
+ return true;
+ }
+
+ wait();
+ return false;
+ }
+
+ public synchronized void cleanup(int jobId) {
+ allJobMetrics.remove(jobId);
+ jobIdToStageId.remove(jobId);
+ Iterator<Map.Entry<Integer, Integer>> iterator = stageIdToJobId.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Integer> entry = iterator.next();
+ if (entry.getValue() == jobId) {
+ iterator.remove();
+ }
+ }
+ }
+
+ public synchronized void reset() {
+ stageIdToJobId.clear();
+ jobIdToStageId.clear();
+ allJobMetrics.clear();
+ finishedJobIds.clear();
+ }
+}
\ No newline at end of file
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java?rev=1657614&r1=1657613&r2=1657614&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java Thu Feb 5 15:59:24 2015
@@ -7,7 +7,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
-import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
+import org.apache.pig.tools.pigstats.spark.SparkPigStats;
public class SparkExecutionEngine extends HExecutionEngine {
@@ -25,6 +25,6 @@ public class SparkExecutionEngine extend
@Override
public PigStats instantiatePigStats() {
- return new SimplePigStats();
+ return new SparkPigStats();
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1657614&r1=1657613&r2=1657614&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Thu Feb 5 15:59:24 2015
@@ -5,15 +5,21 @@ import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -72,11 +78,11 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.SparkStats;
+import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.JobLogger;
import org.apache.spark.scheduler.StatsReportListener;
-import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
/**
@@ -89,7 +95,8 @@ public class SparkLauncher extends Launc
// Our connection to Spark. It needs to be static so that it can be reused
// across jobs, because a
// new SparkLauncher gets created for each job.
- private static SparkContext sparkContext = null;
+ private static JavaSparkContext sparkContext = null;
+ private static JobMetricsListener jobMetricsListener = new JobMetricsListener();
public static BroadCastServer bcaster;
private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
@@ -99,6 +106,7 @@ public class SparkLauncher extends Launc
// it to be shared across SparkLaunchers. It gets cleared whenever we close
// the SparkContext.
// private static CacheConverter cacheConverter = null;
+ private String jobGroupID;
@Override
public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
@@ -128,9 +136,19 @@ public class SparkLauncher extends Launc
}
}
+ SparkPigStats sparkStats = (SparkPigStats)
+ pigContext.getExecutionEngine().instantiatePigStats();
+ PigStats.start(sparkStats);
+
startSparkIfNeeded();
+ // Set a unique group id for this query, so we can lookup all Spark job ids
+ // related to this query.
+ jobGroupID = UUID.randomUUID().toString();
+ sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster", false);
+ jobMetricsListener.reset();
+
String currentDirectoryPath = Paths.get(".").toAbsolutePath().normalize().toString() + "/";
- startSparkJob(pigContext,currentDirectoryPath);
+ startSparkJob(pigContext, currentDirectoryPath);
LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
physicalPlan, POStore.class);
POStore firstStore = stores.getFirst();
@@ -144,7 +162,7 @@ public class SparkLauncher extends Launc
Map<Class<? extends PhysicalOperator>, POConverter> convertMap = new HashMap<Class<? extends PhysicalOperator>, POConverter>();
convertMap.put(POLoad.class, new LoadConverter(pigContext,
- physicalPlan, sparkContext));
+ physicalPlan, sparkContext.sc()));
convertMap.put(POStore.class, new StoreConverter(pigContext));
convertMap.put(POForEach.class, new ForEachConverter(confBytes));
convertMap.put(POFilter.class, new FilterConverter());
@@ -155,7 +173,7 @@ public class SparkLauncher extends Launc
convertMap.put(POGlobalRearrange.class, new GlobalRearrangeConverter());
convertMap.put(POLimit.class, new LimitConverter());
convertMap.put(PODistinct.class, new DistinctConverter());
- convertMap.put(POUnion.class, new UnionConverter(sparkContext));
+ convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
convertMap.put(POSort.class, new SortConverter());
convertMap.put(POSplit.class, new SplitConverter());
convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
@@ -166,15 +184,50 @@ public class SparkLauncher extends Launc
Map<OperatorKey, RDD<Tuple>> rdds = new HashMap<OperatorKey, RDD<Tuple>>();
- SparkStats stats = new SparkStats();
-
+ Set<Integer> seenJobIDs = new HashSet<Integer>();
for (POStore poStore : stores) {
physicalToRDD(physicalPlan, poStore, rdds, convertMap);
- stats.addOutputInfo(poStore, 1, 1, true, c); // TODO: use real
- // values
+ for (int jobID : getJobIDs(seenJobIDs)) {
+ SparkStatsUtil.waitForJobAddStats(jobID, poStore,
+ jobMetricsListener, sparkContext, sparkStats, c);
+ }
}
+
cleanUpSparkJob(pigContext,currentDirectoryPath);
- return stats;
+ sparkStats.finish();
+ return sparkStats;
+ }
+
+
+ /**
+ * In Spark, currently only async actions return job id.
+ * There is no async equivalent of actions like saveAsNewAPIHadoopFile()
+ *
+ * The only other way to get a job id is to register a "job group ID" with the
+ * spark context and request all job ids corresponding to that job group via
+ * getJobIdsForGroup.
+ *
+ * However getJobIdsForGroup does not guarantee the order of the elements in
+ * it's result.
+ *
+ * This method simply returns the previously unseen job ids.
+ *
+ * @param seenJobIDs job ids in the job group that are already seen
+ * @return Spark job ids not seen before
+ */
+ private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
+ Set<Integer> groupjobIDs = new HashSet<Integer>(Arrays.asList(
+ ArrayUtils.toObject(sparkContext.statusTracker()
+ .getJobIdsForGroup(jobGroupID))));
+ groupjobIDs.removeAll(seenJobIDs);
+ List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
+ if (unseenJobIDs.size() == 0) {
+ throw new RuntimeException("Expected at least one unseen jobID " +
+ " in this call to getJobIdsForGroup, but got " + unseenJobIDs.size());
+ }
+
+ seenJobIDs.addAll(unseenJobIDs);
+ return unseenJobIDs;
}
private void cleanUpSparkJob(PigContext pigContext, String currentDirectoryPath) {
@@ -341,11 +394,11 @@ public class SparkLauncher extends Launc
// System.setProperty("spark.shuffle.memoryFraction", "0.0");
// System.setProperty("spark.storage.memoryFraction", "0.0");
- JavaSparkContext javaContext = new JavaSparkContext(master,
+ sparkContext = new JavaSparkContext(master,
"Spork", sparkHome, jars.toArray(new String[jars.size()]));
- sparkContext = javaContext.sc();
- sparkContext.addSparkListener(new StatsReportListener());
- sparkContext.addSparkListener(new JobLogger());
+ sparkContext.sc().addSparkListener(new StatsReportListener());
+ sparkContext.sc().addSparkListener(new JobLogger());
+ sparkContext.sc().addSparkListener(jobMetricsListener);
// cacheConverter = new CacheConverter();
}
}
Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1657614&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (added)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Thu Feb 5 15:59:24 2015
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.executor.TaskMetrics;
+import scala.Option;
+
+import java.util.List;
+import java.util.Map;
+
+public class SparkJobStats extends JobStats {
+
+ private int jobId;
+ private Map<String, Long> stats = Maps.newLinkedHashMap();
+
+ protected SparkJobStats(int jobId, PigStats.JobGraph plan) {
+ this(String.valueOf(jobId), plan);
+ this.jobId = jobId;
+ }
+
+ protected SparkJobStats(String jobId, PigStats.JobGraph plan) {
+ super(jobId, plan);
+ }
+
+ public void addOutputInfo(POStore poStore, boolean success,
+ JobMetricsListener jobMetricsListener,
+ Configuration conf) {
+ // TODO: Compute #records
+ long bytes = getOutputSize(poStore, conf);
+ OutputStats outputStats = new OutputStats(poStore.getSFile().getFileName(),
+ bytes, 1, success);
+ outputStats.setPOStore(poStore);
+ outputStats.setConf(conf);
+ outputs.add(outputStats);
+ }
+
+ public void collectStats(JobMetricsListener jobMetricsListener) {
+ Map<String, List<TaskMetrics>> taskMetrics = jobMetricsListener.getJobMetric(jobId);
+ if (taskMetrics == null) {
+ throw new RuntimeException("No task metrics available for jobId " + jobId);
+ }
+
+ stats = combineTaskMetrics(taskMetrics);
+ }
+
+ public Map<String, Long> getStats() {
+ return stats;
+ }
+
+ private Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) {
+ Map<String, Long> results = Maps.newLinkedHashMap();
+
+ long executorDeserializeTime = 0;
+ long executorRunTime = 0;
+ long resultSize = 0;
+ long jvmGCTime = 0;
+ long resultSerializationTime = 0;
+ long memoryBytesSpilled = 0;
+ long diskBytesSpilled = 0;
+ long bytesRead = 0;
+ long bytesWritten = 0;
+ long remoteBlocksFetched = 0;
+ long localBlocksFetched = 0;
+ long fetchWaitTime = 0;
+ long remoteBytesRead = 0;
+ long shuffleBytesWritten = 0;
+ long shuffleWriteTime = 0;
+ boolean inputMetricExist = false;
+ boolean outputMetricExist = false;
+ boolean shuffleReadMetricExist = false;
+ boolean shuffleWriteMetricExist = false;
+
+ for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+ if (stageMetric != null) {
+ for (TaskMetrics taskMetrics : stageMetric) {
+ if (taskMetrics != null) {
+ executorDeserializeTime += taskMetrics.executorDeserializeTime();
+ executorRunTime += taskMetrics.executorRunTime();
+ resultSize += taskMetrics.resultSize();
+ jvmGCTime += taskMetrics.jvmGCTime();
+ resultSerializationTime += taskMetrics.resultSerializationTime();
+ memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+ diskBytesSpilled += taskMetrics.diskBytesSpilled();
+ if (!taskMetrics.inputMetrics().isEmpty()) {
+ inputMetricExist = true;
+ bytesRead += taskMetrics.inputMetrics().get().bytesRead();
+ }
+
+ if (!taskMetrics.outputMetrics().isEmpty()) {
+ outputMetricExist = true;
+ bytesWritten += taskMetrics.outputMetrics().get().bytesWritten();
+ }
+
+ Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+ if (!shuffleReadMetricsOption.isEmpty()) {
+ shuffleReadMetricExist = true;
+ remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
+ localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
+ fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
+ remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
+ }
+
+ Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+ if (!shuffleWriteMetricsOption.isEmpty()) {
+ shuffleWriteMetricExist = true;
+ shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
+ shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+ }
+
+ }
+ }
+ }
+ }
+
+ results.put("EexcutorDeserializeTime", executorDeserializeTime);
+ results.put("ExecutorRunTime", executorRunTime);
+ results.put("ResultSize", resultSize);
+ results.put("JvmGCTime", jvmGCTime);
+ results.put("ResultSerializationTime", resultSerializationTime);
+ results.put("MemoryBytesSpilled", memoryBytesSpilled);
+ results.put("DiskBytesSpilled", diskBytesSpilled);
+ if (inputMetricExist) {
+ results.put("BytesRead", bytesRead);
+ }
+
+ if (outputMetricExist) {
+ results.put("BytesWritten", bytesWritten);
+ }
+
+ if (shuffleReadMetricExist) {
+ results.put("RemoteBlocksFetched", remoteBlocksFetched);
+ results.put("LocalBlocksFetched", localBlocksFetched);
+ results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
+ results.put("FetchWaitTime", fetchWaitTime);
+ results.put("RemoteBytesRead", remoteBytesRead);
+ }
+
+ if (shuffleWriteMetricExist) {
+ results.put("ShuffleBytesWritten", shuffleBytesWritten);
+ results.put("ShuffleWriteTime", shuffleWriteTime);
+ }
+
+ return results;
+ }
+
+ @Override
+ public String getJobId() {
+ return String.valueOf(jobId);
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws FrontendException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getDisplayString() {
+ return null;
+ }
+
+ @Override
+ public int getNumberMaps() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumberReduces() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMaxMapTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMinMapTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getAvgMapTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMaxReduceTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMinReduceTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getAvgREduceTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMapInputRecords() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMapOutputRecords() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getReduceInputRecords() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getReduceOutputRecords() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getSMMSpillCount() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getProactiveSpillCountObjects() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getProactiveSpillCountRecs() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Counters getHadoopCounters() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, Long> getMultiStoreCounters() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, Long> getMultiInputCounters() {
+ throw new UnsupportedOperationException();
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1657614&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (added)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Thu Feb 5 15:59:24 2015
@@ -0,0 +1,130 @@
+package org.apache.pig.tools.pigstats.spark;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class SparkPigStats extends PigStats {
+
+ private static final Log LOG = LogFactory.getLog(SparkPigStats.class);
+
+ public SparkPigStats() {
+ jobPlan = new JobGraph();
+ }
+
+ public void addJobStats(POStore poStore, int jobId,
+ JobMetricsListener jobMetricsListener,
+ JavaSparkContext sparkContext,
+ Configuration conf) {
+ boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext);
+ SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+ jobStats.setSuccessful(isSuccess);
+ jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+ jobStats.collectStats(jobMetricsListener);
+ jobPlan.add(jobStats);
+ }
+
+ public void finish() {
+ super.stop();
+ display();
+ }
+
+ private void display() {
+ Iterator<JobStats> iter = jobPlan.iterator();
+ while (iter.hasNext()) {
+ SparkJobStats js = (SparkJobStats)iter.next();
+ LOG.info( "Spark Job [" + js.getJobId() + "] Metrics");
+ Map<String, Long> stats = js.getStats();
+ if (stats == null) {
+ LOG.info("No statistics found for job " + js.getJobId());
+ return;
+ }
+
+ Iterator statIt = stats.entrySet().iterator();
+ while (statIt.hasNext()) {
+ Map.Entry pairs = (Map.Entry)statIt.next();
+ LOG.info("\t" + pairs.getKey() + " : " + pairs.getValue());
+ }
+ }
+ }
+
+ @Override
+ public JobClient getJobClient() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isEmbedded() {
+ return false;
+ }
+
+ @Override
+ public Map<String, List<PigStats>> getAllStats() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> getAllErrorMessages() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Properties getPigProperties() {
+ return null;
+ }
+
+ @Override
+ public String getOutputAlias(String location) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public long getSMMSpillCount() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getProactiveSpillCountObjects() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getProactiveSpillCountRecords() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getBytesWritten() {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public long getRecordWritten() {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public int getNumberJobs() {
+ return jobPlan.size();
+ }
+
+ @Override
+ public OutputStats result(String alias) {
+ return null;
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1657614&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (added)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Thu Feb 5 15:59:24 2015
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class SparkStatsUtil {
+
+ public static void waitForJobAddStats(int jobID,
+ POStore poStore,
+ JobMetricsListener jobMetricsListener,
+ JavaSparkContext sparkContext,
+ SparkPigStats sparkPigStats,
+ JobConf jobConf)
+ throws InterruptedException {
+ // Even though we are not making any async calls to spark,
+ // the SparkStatusTracker can still return RUNNING status
+ // for a finished job.
+ // Looks like there is a race condition between spark
+ // "event bus" thread updating it's internal listener and
+ // this driver thread calling SparkStatusTracker.
+ // To workaround this, we will wait for this job to "finish".
+ jobMetricsListener.waitForJobToEnd(jobID);
+ sparkPigStats.addJobStats(poStore, jobID, jobMetricsListener,
+ sparkContext, jobConf);
+ jobMetricsListener.cleanup(jobID);
+ }
+
+ public static boolean isJobSuccess(int jobID,
+ JavaSparkContext sparkContext) {
+ JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();
+ if (status == JobExecutionStatus.SUCCEEDED) {
+ return true;
+ } else if (status != JobExecutionStatus.FAILED) {
+ throw new RuntimeException("Unexpected job execution status " +
+ status);
+ }
+
+ return false;
+ }
+
+ private static SparkJobInfo getJobInfo(int jobID,
+ JavaSparkContext sparkContext) {
+ SparkJobInfo jobInfo = sparkContext.statusTracker().getJobInfo(jobID);
+ if (jobInfo == null) {
+ throw new RuntimeException("No jobInfo available for jobID "
+ + jobID);
+ }
+
+ return jobInfo;
+ }
+}
\ No newline at end of file