You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/07/19 01:32:42 UTC

svn commit: r1802347 [2/2] - in /pig/trunk: ./ ivy/ src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ src/org/apache/pig/tools/pigstats/spark/ test/org/apache/pig/test/

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Wed Jul 19 01:32:41 2017
@@ -25,11 +25,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.util.Pair;
@@ -54,7 +55,6 @@ import org.apache.pig.impl.plan.PlanExce
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
 public class SkewedJoinConverter implements
@@ -103,7 +103,7 @@ public class SkewedJoinConverter impleme
 
         // with partition id
         StreamPartitionIndexKeyFunction streamFun = new StreamPartitionIndexKeyFunction(this, keyDist, defaultParallelism);
-        JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = rdd2.toJavaRDD().flatMap(streamFun);
+        JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = rdd2.toJavaRDD().flatMap(SparkShims.getInstance().flatMapFunction(streamFun));
 
         // Tuple2 RDD to Pair RDD
         JavaPairRDD<PartitionIndexedKey, Tuple> streamIndexedJavaPairRDD = new JavaPairRDD<PartitionIndexedKey, Tuple>(
@@ -146,7 +146,7 @@ public class SkewedJoinConverter impleme
      * @param <R> be generic because it can be Optional<Tuple> or Tuple
      */
     private static class ToValueFunction<L, R> implements
-            FlatMapFunction<Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>>, Tuple>, Serializable {
+            FlatMapFunctionAdapter<Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>>, Tuple>, Serializable {
 
         private boolean[] innerFlags;
         private int[] schemaSize;
@@ -188,7 +188,7 @@ public class SkewedJoinConverter impleme
                             Tuple leftTuple = tf.newTuple();
                             if (!innerFlags[0]) {
                                 // left should be Optional<Tuple>
-                                Optional<Tuple> leftOption = (Optional<Tuple>) left;
+                                SparkShims.OptionalWrapper<L> leftOption = SparkShims.getInstance().wrapOptional(left);
                                 if (!leftOption.isPresent()) {
                                     // Add an empty left record for RIGHT OUTER JOIN.
                                     // Notice: if it is a skewed, only join the first reduce key
@@ -200,7 +200,7 @@ public class SkewedJoinConverter impleme
                                         return this.next();
                                     }
                                 } else {
-                                    leftTuple = leftOption.get();
+                                    leftTuple = (Tuple) leftOption.get();
                                 }
                             } else {
                                 leftTuple = (Tuple) left;
@@ -212,13 +212,13 @@ public class SkewedJoinConverter impleme
                             Tuple rightTuple = tf.newTuple();
                             if (!innerFlags[1]) {
                                 // right should be Optional<Tuple>
-                                Optional<Tuple> rightOption = (Optional<Tuple>) right;
+                                SparkShims.OptionalWrapper<R> rightOption = SparkShims.getInstance().wrapOptional(right);
                                 if (!rightOption.isPresent()) {
                                     for (int i = 0; i < schemaSize[1]; i++) {
                                         rightTuple.append(null);
                                     }
                                 } else {
-                                    rightTuple = rightOption.get();
+                                    rightTuple = (Tuple) rightOption.get();
                                 }
                             } else {
                                 rightTuple = (Tuple) right;
@@ -234,17 +234,17 @@ public class SkewedJoinConverter impleme
                             return result;
                         } catch (Exception e) {
                             log.warn(e);
+                            return null;
                         }
-                        return null;
                     }
                 };
             }
         }
 
         @Override
-        public Iterable<Tuple> call(
-                Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) {
-            return new Tuple2TransformIterable(input);
+        public Iterator<Tuple> call(
+                Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) throws Exception {
+            return new Tuple2TransformIterable(input).iterator();
         }
 
         private boolean isFirstReduceKey(PartitionIndexedKey pKey) {
@@ -413,7 +413,7 @@ public class SkewedJoinConverter impleme
 
                 return tuple_KeyValue;
             } catch (Exception e) {
-                System.out.print(e);
+                log.warn(e);
                 return null;
             }
         }
@@ -469,7 +469,7 @@ public class SkewedJoinConverter impleme
      * <p>
      * see: https://wiki.apache.org/pig/PigSkewedJoinSpec
      */
-    private static class StreamPartitionIndexKeyFunction implements FlatMapFunction<Tuple, Tuple2<PartitionIndexedKey, Tuple>> {
+    private static class StreamPartitionIndexKeyFunction implements FlatMapFunctionAdapter<Tuple, Tuple2<PartitionIndexedKey, Tuple>> {
 
         private SkewedJoinConverter poSkewedJoin;
         private final Broadcast<List<Tuple>> keyDist;
@@ -487,7 +487,8 @@ public class SkewedJoinConverter impleme
             this.defaultParallelism = defaultParallelism;
         }
 
-        public Iterable<Tuple2<PartitionIndexedKey, Tuple>> call(Tuple tuple) throws Exception {
+        @Override
+        public Iterator<Tuple2<PartitionIndexedKey, Tuple>> call(Tuple tuple) throws Exception {
             if (!initialized) {
                 Integer[] reducers = new Integer[1];
                 reducerMap = loadKeyDistribution(keyDist, reducers);
@@ -526,12 +527,12 @@ public class SkewedJoinConverter impleme
                 l.add(new Tuple2(pIndexKey, tuple));
             }
 
-            return l;
+            return l.iterator();
         }
     }
 
     /**
-     * user defined spark partitioner for skewed join
+     * User defined spark partitioner for skewed join
      */
     private static class SkewedJoinPartitioner extends Partitioner {
         private int numPartitions;
@@ -568,12 +569,8 @@ public class SkewedJoinConverter impleme
     }
 
     /**
-     * use parallelism from keyDist or the default parallelism to
+     * Use parallelism from keyDist or the default parallelism to
      * create user defined partitioner
-     *
-     * @param keyDist
-     * @param defaultParallelism
-     * @return
      */
     private SkewedJoinPartitioner buildPartitioner(Broadcast<List<Tuple>> keyDist, Integer defaultParallelism) {
         Integer parallelism = -1;
@@ -588,12 +585,7 @@ public class SkewedJoinConverter impleme
     }
 
     /**
-     * do all kinds of Join (inner, left outer, right outer, full outer)
-     *
-     * @param skewIndexedJavaPairRDD
-     * @param streamIndexedJavaPairRDD
-     * @param partitioner
-     * @return
+     * Do all kinds of Join (inner, left outer, right outer, full outer)
      */
     private JavaRDD<Tuple> doJoin(
             JavaPairRDD<PartitionIndexedKey, Tuple> skewIndexedJavaPairRDD,
@@ -616,25 +608,22 @@ public class SkewedJoinConverter impleme
             JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Tuple>> resultKeyValue = skewIndexedJavaPairRDD.
                     join(streamIndexedJavaPairRDD, partitioner);
 
-            return resultKeyValue.mapPartitions(toValueFun);
+            return resultKeyValue.mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
         } else if (innerFlags[0] && !innerFlags[1]) {
             // left outer join
-            JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD.
-                    leftOuterJoin(streamIndexedJavaPairRDD, partitioner);
-
-            return resultKeyValue.mapPartitions(toValueFun);
+            return skewIndexedJavaPairRDD
+                    .leftOuterJoin(streamIndexedJavaPairRDD, partitioner)
+                    .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
         } else if (!innerFlags[0] && innerFlags[1]) {
             // right outer join
-            JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Tuple>> resultKeyValue = skewIndexedJavaPairRDD.
-                    rightOuterJoin(streamIndexedJavaPairRDD, partitioner);
-
-            return resultKeyValue.mapPartitions(toValueFun);
+            return skewIndexedJavaPairRDD
+                    .rightOuterJoin(streamIndexedJavaPairRDD, partitioner)
+                    .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
         } else {
             // full outer join
-            JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD.
-                    fullOuterJoin(streamIndexedJavaPairRDD, partitioner);
-
-            return resultKeyValue.mapPartitions(toValueFun);
+            return skewIndexedJavaPairRDD
+                    .fullOuterJoin(streamIndexedJavaPairRDD, partitioner)
+                    .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
         }
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java Wed Jul 19 01:32:41 2017
@@ -22,25 +22,26 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import scala.Tuple2;
 import scala.runtime.AbstractFunction1;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
 @SuppressWarnings("serial")
 public class SortConverter implements RDDConverter<Tuple, Tuple, POSort> {
     private static final Log LOG = LogFactory.getLog(SortConverter.class);
 
-    private static final FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple> TO_VALUE_FUNCTION = new ToValueFunction();
+    private static final FlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, Tuple> TO_VALUE_FUNCTION = new ToValueFunction();
 
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POSort sortOperator)
@@ -57,13 +58,13 @@ public class SortConverter implements RD
 
         JavaPairRDD<Tuple, Object> sorted = r.sortByKey(
                 sortOperator.getMComparator(), true, parallelism);
-        JavaRDD<Tuple> mapped = sorted.mapPartitions(TO_VALUE_FUNCTION);
+        JavaRDD<Tuple> mapped = sorted.mapPartitions(SparkShims.getInstance().flatMapFunction(TO_VALUE_FUNCTION));
 
         return mapped.rdd();
     }
 
     private static class ToValueFunction implements
-            FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable {
+            FlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable {
 
         private class Tuple2TransformIterable implements Iterable<Tuple> {
 
@@ -84,8 +85,8 @@ public class SortConverter implements RD
         }
 
         @Override
-        public Iterable<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) {
-            return new Tuple2TransformIterable(input);
+        public Iterator<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) {
+            return new Tuple2TransformIterable(input).iterator();
         }
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java Wed Jul 19 01:32:41 2017
@@ -19,18 +19,18 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.backend.hadoop.executionengine.spark.PairFlatMapFunctionAdapter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.spark.api.java.function.Function2;
 import scala.Tuple2;
 import scala.runtime.AbstractFunction1;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -40,12 +40,11 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.rdd.RDD;
-  /*
-   sort the sample data and convert the sample data to the format (all,{(sampleEle1),(sampleEle2),...})
 
-   */
+/*
+ sort the sample data and convert the sample data to the format (all,{(sampleEle1),(sampleEle2),...})
+ */
 @SuppressWarnings("serial")
 public class SparkSampleSortConverter implements RDDConverter<Tuple, Tuple, POSampleSortSpark> {
     private static final Log LOG = LogFactory.getLog(SparkSampleSortConverter.class);
@@ -66,14 +65,14 @@ public class SparkSampleSortConverter im
          //sort sample data
         JavaPairRDD<Tuple, Object> sorted = r.sortByKey(true);
          //convert every element in sample data from element to (all, element) format
-        JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(new AggregateFunction());
+        JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(SparkShims.getInstance().pairFlatMapFunction(new AggregateFunction()));
         //use groupByKey to aggregate all values( the format will be ((all),{(sampleEle1),(sampleEle2),...} )
         JavaRDD<Tuple> groupByKey= mapped.groupByKey().map(new ToValueFunction());
         return  groupByKey.rdd();
     }
 
 
-    private static class MergeFunction implements org.apache.spark.api.java.function.Function2<Tuple, Tuple, Tuple>
+    private static class MergeFunction implements Function2<Tuple, Tuple, Tuple>
             , Serializable {
 
         @Override
@@ -89,7 +88,7 @@ public class SparkSampleSortConverter im
     // input: Tuple2<Tuple,Object>
     // output: Tuple2("all", Tuple)
     private static class AggregateFunction implements
-            PairFlatMapFunction<Iterator<Tuple2<Tuple, Object>>, String,Tuple>, Serializable {
+            PairFlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, String,Tuple>, Serializable {
 
         private class Tuple2TransformIterable implements Iterable<Tuple2<String,Tuple>> {
 
@@ -111,8 +110,8 @@ public class SparkSampleSortConverter im
         }
 
         @Override
-        public Iterable<Tuple2<String, Tuple>> call(Iterator<Tuple2<Tuple, Object>> input) throws Exception {
-            return new Tuple2TransformIterable(input);
+        public Iterator<Tuple2<String, Tuple>> call(Iterator<Tuple2<Tuple, Object>> input) throws Exception {
+            return new Tuple2TransformIterable(input).iterator();
         }
 
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java Wed Jul 19 01:32:41 2017
@@ -25,9 +25,8 @@ import java.util.List;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.*;
 import org.apache.pig.data.Tuple;
-import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
 public class StreamConverter implements
@@ -35,44 +34,40 @@ public class StreamConverter implements
 
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
-            POStream poStream) throws IOException {
+                              POStream poStream) throws IOException {
         SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
         RDD<Tuple> rdd = predecessors.get(0);
         StreamFunction streamFunction = new StreamFunction(poStream);
-        return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd();
+        return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(streamFunction), true).rdd();
     }
 
     private static class StreamFunction implements
-            FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+            FlatMapFunctionAdapter<Iterator<Tuple>, Tuple>, Serializable {
         private POStream poStream;
 
         private StreamFunction(POStream poStream) {
             this.poStream = poStream;
         }
 
-        public Iterable<Tuple> call(final Iterator<Tuple> input) {
-            return new Iterable<Tuple>() {
+        @Override
+        public Iterator<Tuple> call(final Iterator<Tuple> input) {
+            return new OutputConsumerIterator(input) {
+
+                @Override
+                protected void attach(Tuple tuple) {
+                    poStream.setInputs(null);
+                    poStream.attachInput(tuple);
+                }
+
                 @Override
-                public Iterator<Tuple> iterator() {
-                    return new OutputConsumerIterator(input) {
+                protected Result getNextResult() throws ExecException {
+                    Result result = poStream.getNextTuple();
+                    return result;
+                }
 
-                        @Override
-                        protected void attach(Tuple tuple) {
-                            poStream.setInputs(null);
-                            poStream.attachInput(tuple);
-                        }
-
-                        @Override
-                        protected Result getNextResult() throws ExecException {
-                            Result result = poStream.getNextTuple();
-                            return result;
-                        }
-
-                        @Override
-                        protected void endOfInput() {
-                            poStream.setFetchable(true);
-                        }
-                    };
+                @Override
+                protected void endOfInput() {
+                    poStream.setFetchable(true);
                 }
             };
         }

Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java?rev=1802347&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java Wed Jul 19 01:32:41 2017
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.spark;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.executor.TaskMetrics;
+import scala.Option;
+
+import java.util.List;
+import java.util.Map;
+
+public class Spark1JobStats extends SparkJobStats {
+    public Spark1JobStats(int jobId, PigStats.JobGraph plan, Configuration conf) {
+        super(jobId, plan, conf);
+    }
+
+    public Spark1JobStats(String jobId, PigStats.JobGraph plan, Configuration conf) {
+        super(jobId, plan, conf);
+    }
+
+    @Override
+    protected Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) {
+        Map<String, Long> results = Maps.newLinkedHashMap();
+
+        long executorDeserializeTime = 0;
+        long executorRunTime = 0;
+        long resultSize = 0;
+        long jvmGCTime = 0;
+        long resultSerializationTime = 0;
+        long memoryBytesSpilled = 0;
+        long diskBytesSpilled = 0;
+        long bytesRead = 0;
+        long bytesWritten = 0;
+        long remoteBlocksFetched = 0;
+        long localBlocksFetched = 0;
+        long fetchWaitTime = 0;
+        long remoteBytesRead = 0;
+        long shuffleBytesWritten = 0;
+        long shuffleWriteTime = 0;
+        boolean inputMetricExist = false;
+        boolean outputMetricExist = false;
+        boolean shuffleReadMetricExist = false;
+        boolean shuffleWriteMetricExist = false;
+
+        for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+            if (stageMetric != null) {
+                for (TaskMetrics taskMetrics : stageMetric) {
+                    if (taskMetrics != null) {
+                        executorDeserializeTime += taskMetrics.executorDeserializeTime();
+                        executorRunTime += taskMetrics.executorRunTime();
+                        resultSize += taskMetrics.resultSize();
+                        jvmGCTime += taskMetrics.jvmGCTime();
+                        resultSerializationTime += taskMetrics.resultSerializationTime();
+                        memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+                        diskBytesSpilled += taskMetrics.diskBytesSpilled();
+                        if (!taskMetrics.inputMetrics().isEmpty()) {
+                            inputMetricExist = true;
+                            bytesRead += taskMetrics.inputMetrics().get().bytesRead();
+                        }
+
+                        if (!taskMetrics.outputMetrics().isEmpty()) {
+                            outputMetricExist = true;
+                            bytesWritten += taskMetrics.outputMetrics().get().bytesWritten();
+                        }
+
+                        Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+                        if (!shuffleReadMetricsOption.isEmpty()) {
+                            shuffleReadMetricExist = true;
+                            remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
+                            localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
+                            fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
+                            remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
+                        }
+
+                        Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+                        if (!shuffleWriteMetricsOption.isEmpty()) {
+                            shuffleWriteMetricExist = true;
+                            shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
+                            shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+                        }
+
+                    }
+                }
+            }
+        }
+
+        results.put("ExcutorDeserializeTime", executorDeserializeTime);
+        results.put("ExecutorRunTime", executorRunTime);
+        results.put("ResultSize", resultSize);
+        results.put("JvmGCTime", jvmGCTime);
+        results.put("ResultSerializationTime", resultSerializationTime);
+        results.put("MemoryBytesSpilled", memoryBytesSpilled);
+        results.put("DiskBytesSpilled", diskBytesSpilled);
+        if (inputMetricExist) {
+            results.put("BytesRead", bytesRead);
+            hdfsBytesRead = bytesRead;
+            counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
+        }
+
+        if (outputMetricExist) {
+            results.put("BytesWritten", bytesWritten);
+            hdfsBytesWritten = bytesWritten;
+            counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
+        }
+
+        if (shuffleReadMetricExist) {
+            results.put("RemoteBlocksFetched", remoteBlocksFetched);
+            results.put("LocalBlocksFetched", localBlocksFetched);
+            results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
+            results.put("FetchWaitTime", fetchWaitTime);
+            results.put("RemoteBytesRead", remoteBytesRead);
+        }
+
+        if (shuffleWriteMetricExist) {
+            results.put("ShuffleBytesWritten", shuffleBytesWritten);
+            results.put("ShuffleWriteTime", shuffleWriteTime);
+        }
+
+        return results;
+    }
+}

Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java?rev=1802347&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java Wed Jul 19 01:32:41 2017
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.spark;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.executor.TaskMetrics;
+
+import java.util.List;
+import java.util.Map;
+
+public class Spark2JobStats extends SparkJobStats {
+    public Spark2JobStats(int jobId, PigStats.JobGraph plan, Configuration conf) {
+        super(jobId, plan, conf);
+    }
+
+    public Spark2JobStats(String jobId, PigStats.JobGraph plan, Configuration conf) {
+        super(jobId, plan, conf);
+    }
+
+    @Override
+    protected Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) {
+        Map<String, Long> results = Maps.newLinkedHashMap();
+
+        long executorDeserializeTime = 0;
+        long executorRunTime = 0;
+        long resultSize = 0;
+        long jvmGCTime = 0;
+        long resultSerializationTime = 0;
+        long memoryBytesSpilled = 0;
+        long diskBytesSpilled = 0;
+        long bytesRead = 0;
+        long bytesWritten = 0;
+        long remoteBlocksFetched = 0;
+        long localBlocksFetched = 0;
+        long fetchWaitTime = 0;
+        long remoteBytesRead = 0;
+        long shuffleBytesWritten = 0;
+        long shuffleWriteTime = 0;
+
+        for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+            if (stageMetric != null) {
+                for (TaskMetrics taskMetrics : stageMetric) {
+                    if (taskMetrics != null) {
+                        executorDeserializeTime += taskMetrics.executorDeserializeTime();
+                        executorRunTime += taskMetrics.executorRunTime();
+                        resultSize += taskMetrics.resultSize();
+                        jvmGCTime += taskMetrics.jvmGCTime();
+                        resultSerializationTime += taskMetrics.resultSerializationTime();
+                        memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+                        diskBytesSpilled += taskMetrics.diskBytesSpilled();
+                        bytesRead += taskMetrics.inputMetrics().bytesRead();
+
+                        bytesWritten += taskMetrics.outputMetrics().bytesWritten();
+
+                        ShuffleReadMetrics shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+                        remoteBlocksFetched += shuffleReadMetricsOption.remoteBlocksFetched();
+                        localBlocksFetched += shuffleReadMetricsOption.localBlocksFetched();
+                        fetchWaitTime += shuffleReadMetricsOption.fetchWaitTime();
+                        remoteBytesRead += shuffleReadMetricsOption.remoteBytesRead();
+
+                        ShuffleWriteMetrics shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+                        shuffleBytesWritten += shuffleWriteMetricsOption.shuffleBytesWritten();
+                        shuffleWriteTime += shuffleWriteMetricsOption.shuffleWriteTime();
+                    }
+                }
+            }
+        }
+
+        results.put("ExcutorDeserializeTime", executorDeserializeTime);
+        results.put("ExecutorRunTime", executorRunTime);
+        results.put("ResultSize", resultSize);
+        results.put("JvmGCTime", jvmGCTime);
+        results.put("ResultSerializationTime", resultSerializationTime);
+        results.put("MemoryBytesSpilled", memoryBytesSpilled);
+        results.put("DiskBytesSpilled", diskBytesSpilled);
+
+        results.put("BytesRead", bytesRead);
+        hdfsBytesRead = bytesRead;
+        counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
+
+        results.put("BytesWritten", bytesWritten);
+        hdfsBytesWritten = bytesWritten;
+        counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
+
+        results.put("RemoteBlocksFetched", remoteBlocksFetched);
+        results.put("LocalBlocksFetched", localBlocksFetched);
+        results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
+        results.put("FetchWaitTime", fetchWaitTime);
+        results.put("RemoteBytesRead", remoteBytesRead);
+
+        results.put("ShuffleBytesWritten", shuffleBytesWritten);
+        results.put("ShuffleWriteTime", shuffleWriteTime);
+
+        return results;
+    }
+}

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Wed Jul 19 01:32:41 2017
@@ -21,30 +21,30 @@ package org.apache.pig.tools.pigstats.sp
 import java.util.List;
 import java.util.Map;
 
-import org.apache.pig.tools.pigstats.*;
-import scala.Option;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.PlanVisitor;
-import org.apache.spark.executor.ShuffleReadMetrics;
-import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.pig.tools.pigstats.InputStats;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.spark.executor.TaskMetrics;
 
 import com.google.common.collect.Maps;
 
-public class SparkJobStats extends JobStats {
+public abstract class SparkJobStats extends JobStats {
 
     private int jobId;
     private Map<String, Long> stats = Maps.newLinkedHashMap();
     private boolean disableCounter;
-    private Counters counters = null;
+    protected Counters counters = null;
     public static String FS_COUNTER_GROUP = "FS_GROUP";
     private Map<String, SparkCounter<Map<String, Long>>> warningCounters = null;
 
@@ -58,6 +58,7 @@ public class SparkJobStats extends JobSt
         setConf(conf);
     }
 
+    @Override
     public void setConf(Configuration conf) {
         super.setConf(conf);
         disableCounter = conf.getBoolean("pig.disable.counter", false);
@@ -65,7 +66,7 @@ public class SparkJobStats extends JobSt
     }
 
     public void addOutputInfo(POStore poStore, boolean success,
-                              JobMetricsListener jobMetricsListener) {
+                              JobStatisticCollector jobStatisticCollector) {
         if (!poStore.isTmpStore()) {
             long bytes = getOutputSize(poStore, conf);
             long recordsCount = -1;
@@ -99,9 +100,9 @@ public class SparkJobStats extends JobSt
         inputs.add(inputStats);
     }
 
-    public void collectStats(JobMetricsListener jobMetricsListener) {
-        if (jobMetricsListener != null) {
-            Map<String, List<TaskMetrics>> taskMetrics = jobMetricsListener.getJobMetric(jobId);
+    public void collectStats(JobStatisticCollector jobStatisticCollector) {
+        if (jobStatisticCollector != null) {
+            Map<String, List<TaskMetrics>> taskMetrics = jobStatisticCollector.getJobMetric(jobId);
             if (taskMetrics == null) {
                 throw new RuntimeException("No task metrics available for jobId " + jobId);
             }
@@ -109,110 +110,12 @@ public class SparkJobStats extends JobSt
         }
     }
 
+    protected abstract Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric);
+
     public Map<String, Long> getStats() {
         return stats;
     }
 
-    private Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) {
-        Map<String, Long> results = Maps.newLinkedHashMap();
-
-        long executorDeserializeTime = 0;
-        long executorRunTime = 0;
-        long resultSize = 0;
-        long jvmGCTime = 0;
-        long resultSerializationTime = 0;
-        long memoryBytesSpilled = 0;
-        long diskBytesSpilled = 0;
-        long bytesRead = 0;
-        long bytesWritten = 0;
-        long remoteBlocksFetched = 0;
-        long localBlocksFetched = 0;
-        long fetchWaitTime = 0;
-        long remoteBytesRead = 0;
-        long shuffleBytesWritten = 0;
-        long shuffleWriteTime = 0;
-        boolean inputMetricExist = false;
-        boolean outputMetricExist = false;
-        boolean shuffleReadMetricExist = false;
-        boolean shuffleWriteMetricExist = false;
-
-        for (List<TaskMetrics> stageMetric : jobMetric.values()) {
-            if (stageMetric != null) {
-                for (TaskMetrics taskMetrics : stageMetric) {
-                    if (taskMetrics != null) {
-                        executorDeserializeTime += taskMetrics.executorDeserializeTime();
-                        executorRunTime += taskMetrics.executorRunTime();
-                        resultSize += taskMetrics.resultSize();
-                        jvmGCTime += taskMetrics.jvmGCTime();
-                        resultSerializationTime += taskMetrics.resultSerializationTime();
-                        memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
-                        diskBytesSpilled += taskMetrics.diskBytesSpilled();
-                        if (!taskMetrics.inputMetrics().isEmpty()) {
-                            inputMetricExist = true;
-                            bytesRead += taskMetrics.inputMetrics().get().bytesRead();
-                        }
-
-                        if (!taskMetrics.outputMetrics().isEmpty()) {
-                            outputMetricExist = true;
-                            bytesWritten += taskMetrics.outputMetrics().get().bytesWritten();
-                        }
-
-                        Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
-                        if (!shuffleReadMetricsOption.isEmpty()) {
-                            shuffleReadMetricExist = true;
-                            remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
-                            localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
-                            fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
-                            remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
-                        }
-
-                        Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
-                        if (!shuffleWriteMetricsOption.isEmpty()) {
-                            shuffleWriteMetricExist = true;
-                            shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
-                            shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
-                        }
-
-                    }
-                }
-            }
-        }
-
-        results.put("EexcutorDeserializeTime", executorDeserializeTime);
-        results.put("ExecutorRunTime", executorRunTime);
-        results.put("ResultSize", resultSize);
-        results.put("JvmGCTime", jvmGCTime);
-        results.put("ResultSerializationTime", resultSerializationTime);
-        results.put("MemoryBytesSpilled", memoryBytesSpilled);
-        results.put("DiskBytesSpilled", diskBytesSpilled);
-        if (inputMetricExist) {
-            results.put("BytesRead", bytesRead);
-            hdfsBytesRead = bytesRead;
-            counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
-        }
-
-        if (outputMetricExist) {
-            results.put("BytesWritten", bytesWritten);
-            hdfsBytesWritten = bytesWritten;
-            counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
-        }
-
-        if (shuffleReadMetricExist) {
-            results.put("RemoteBlocksFetched", remoteBlocksFetched);
-            results.put("LocalBlocksFetched", localBlocksFetched);
-            results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
-            results.put("FetchWaitTime", fetchWaitTime);
-            results.put("RemoteBytesRead", remoteBytesRead);
-        }
-
-        if (shuffleWriteMetricExist) {
-            results.put("ShuffleBytesWritten", shuffleBytesWritten);
-            results.put("ShuffleWriteTime", shuffleWriteTime);
-        }
-
-        return results;
-    }
-
     @Override
     public String getJobId() {
         return String.valueOf(jobId);

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Jul 19 01:32:41 2017
@@ -32,7 +32,8 @@ import org.apache.pig.PigWarning;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -69,14 +70,14 @@ public class SparkPigStats extends PigSt
     }
 
     public void addJobStats(POStore poStore, SparkOperator sparkOperator, int jobId,
-                            JobMetricsListener jobMetricsListener,
+                            JobStatisticCollector jobStatisticCollector,
                             JavaSparkContext sparkContext) {
         boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext);
-        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
+        SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf);
         jobStats.setSuccessful(isSuccess);
-        jobStats.collectStats(jobMetricsListener);
-        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener);
-        addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf);
+        jobStats.collectStats(jobStatisticCollector);
+        jobStats.addOutputInfo(poStore, isSuccess, jobStatisticCollector);
+        addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobStatisticCollector, conf);
         jobStats.initWarningCounters();
         jobSparkOperatorMap.put(jobStats, sparkOperator);
 
@@ -85,22 +86,22 @@ public class SparkPigStats extends PigSt
 
 
     public void addFailJobStats(POStore poStore, SparkOperator sparkOperator, String jobId,
-                                JobMetricsListener jobMetricsListener,
+                                JobStatisticCollector jobStatisticCollector,
                                 JavaSparkContext sparkContext,
                                 Exception e) {
         boolean isSuccess = false;
-        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
+        SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf);
         jobStats.setSuccessful(isSuccess);
-        jobStats.collectStats(jobMetricsListener);
-        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener);
-        addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf);
+        jobStats.collectStats(jobStatisticCollector);
+        jobStats.addOutputInfo(poStore, isSuccess, jobStatisticCollector);
+        addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobStatisticCollector, conf);
         jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);
         jobStats.setBackendException(e);
     }
 
     public void addNativeJobStats(NativeSparkOperator sparkOperator, String jobId, boolean isSuccess, Exception e) {
-        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
+        SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf);
         jobStats.setSuccessful(isSuccess);
         jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);
@@ -229,7 +230,7 @@ public class SparkPigStats extends PigSt
     private void addInputInfoForSparkOper(SparkOperator sparkOperator,
                                           SparkJobStats jobStats,
                                           boolean isSuccess,
-                                          JobMetricsListener jobMetricsListener,
+                                          JobStatisticCollector jobStatisticCollector,
                                           Configuration conf) {
         //to avoid repetition
         if (sparkOperatorsSet.contains(sparkOperator)) {

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Jul 19 01:32:41 2017
@@ -26,7 +26,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobGraphBuilder;
-import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
@@ -44,7 +44,7 @@ public class SparkStatsUtil {
 
     public static void waitForJobAddStats(int jobID,
                                           POStore poStore, SparkOperator sparkOperator,
-                                          JobMetricsListener jobMetricsListener,
+                                          JobStatisticCollector jobStatisticCollector,
                                           JavaSparkContext sparkContext,
                                           SparkPigStats sparkPigStats)
             throws InterruptedException {
@@ -55,20 +55,17 @@ public class SparkStatsUtil {
         // "event bus" thread updating it's internal listener and
         // this driver thread calling SparkStatusTracker.
         // To workaround this, we will wait for this job to "finish".
-        jobMetricsListener.waitForJobToEnd(jobID);
-        sparkPigStats.addJobStats(poStore, sparkOperator, jobID, jobMetricsListener,
+        jobStatisticCollector.waitForJobToEnd(jobID);
+        sparkPigStats.addJobStats(poStore, sparkOperator, jobID, jobStatisticCollector,
                 sparkContext);
-        jobMetricsListener.cleanup(jobID);
+        jobStatisticCollector.cleanup(jobID);
     }
 
     public static void addFailJobStats(String jobID,
                                        POStore poStore, SparkOperator sparkOperator,
                                        SparkPigStats sparkPigStats,
                                        Exception e) {
-        JobMetricsListener jobMetricsListener = null;
-        JavaSparkContext sparkContext = null;
-        sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID, jobMetricsListener,
-                sparkContext, e);
+        sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID, null, null, e);
     }
 
     public static String getCounterName(POStore store) {

Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Wed Jul 19 01:32:41 2017
@@ -60,7 +60,6 @@ import org.apache.pig.tools.pigstats.Pig
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
-import org.apache.pig.tools.pigstats.spark.SparkJobStats;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.Before;