You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/07/19 01:32:42 UTC
svn commit: r1802347 [2/2] - in /pig/trunk: ./ ivy/
src/org/apache/pig/backend/hadoop/executionengine/spark/
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/
src/org/apache/pig/tools/pigstats/spark/ test/org/apache/pig/test/
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Wed Jul 19 01:32:41 2017
@@ -25,11 +25,12 @@ import java.util.List;
import java.util.Map;
import java.util.HashMap;
-import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.util.Pair;
@@ -54,7 +55,6 @@ import org.apache.pig.impl.plan.PlanExce
import org.apache.pig.impl.util.MultiMap;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
public class SkewedJoinConverter implements
@@ -103,7 +103,7 @@ public class SkewedJoinConverter impleme
// with partition id
StreamPartitionIndexKeyFunction streamFun = new StreamPartitionIndexKeyFunction(this, keyDist, defaultParallelism);
- JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = rdd2.toJavaRDD().flatMap(streamFun);
+ JavaRDD<Tuple2<PartitionIndexedKey, Tuple>> streamIdxKeyJavaRDD = rdd2.toJavaRDD().flatMap(SparkShims.getInstance().flatMapFunction(streamFun));
// Tuple2 RDD to Pair RDD
JavaPairRDD<PartitionIndexedKey, Tuple> streamIndexedJavaPairRDD = new JavaPairRDD<PartitionIndexedKey, Tuple>(
@@ -146,7 +146,7 @@ public class SkewedJoinConverter impleme
* @param <R> be generic because it can be Optional<Tuple> or Tuple
*/
private static class ToValueFunction<L, R> implements
- FlatMapFunction<Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>>, Tuple>, Serializable {
+ FlatMapFunctionAdapter<Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>>, Tuple>, Serializable {
private boolean[] innerFlags;
private int[] schemaSize;
@@ -188,7 +188,7 @@ public class SkewedJoinConverter impleme
Tuple leftTuple = tf.newTuple();
if (!innerFlags[0]) {
// left should be Optional<Tuple>
- Optional<Tuple> leftOption = (Optional<Tuple>) left;
+ SparkShims.OptionalWrapper<L> leftOption = SparkShims.getInstance().wrapOptional(left);
if (!leftOption.isPresent()) {
// Add an empty left record for RIGHT OUTER JOIN.
// Notice: if it is a skewed, only join the first reduce key
@@ -200,7 +200,7 @@ public class SkewedJoinConverter impleme
return this.next();
}
} else {
- leftTuple = leftOption.get();
+ leftTuple = (Tuple) leftOption.get();
}
} else {
leftTuple = (Tuple) left;
@@ -212,13 +212,13 @@ public class SkewedJoinConverter impleme
Tuple rightTuple = tf.newTuple();
if (!innerFlags[1]) {
// right should be Optional<Tuple>
- Optional<Tuple> rightOption = (Optional<Tuple>) right;
+ SparkShims.OptionalWrapper<R> rightOption = SparkShims.getInstance().wrapOptional(right);
if (!rightOption.isPresent()) {
for (int i = 0; i < schemaSize[1]; i++) {
rightTuple.append(null);
}
} else {
- rightTuple = rightOption.get();
+ rightTuple = (Tuple) rightOption.get();
}
} else {
rightTuple = (Tuple) right;
@@ -234,17 +234,17 @@ public class SkewedJoinConverter impleme
return result;
} catch (Exception e) {
log.warn(e);
+ return null;
}
- return null;
}
};
}
}
@Override
- public Iterable<Tuple> call(
- Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) {
- return new Tuple2TransformIterable(input);
+ public Iterator<Tuple> call(
+ Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) throws Exception {
+ return new Tuple2TransformIterable(input).iterator();
}
private boolean isFirstReduceKey(PartitionIndexedKey pKey) {
@@ -413,7 +413,7 @@ public class SkewedJoinConverter impleme
return tuple_KeyValue;
} catch (Exception e) {
- System.out.print(e);
+ log.warn(e);
return null;
}
}
@@ -469,7 +469,7 @@ public class SkewedJoinConverter impleme
* <p>
* see: https://wiki.apache.org/pig/PigSkewedJoinSpec
*/
- private static class StreamPartitionIndexKeyFunction implements FlatMapFunction<Tuple, Tuple2<PartitionIndexedKey, Tuple>> {
+ private static class StreamPartitionIndexKeyFunction implements FlatMapFunctionAdapter<Tuple, Tuple2<PartitionIndexedKey, Tuple>> {
private SkewedJoinConverter poSkewedJoin;
private final Broadcast<List<Tuple>> keyDist;
@@ -487,7 +487,8 @@ public class SkewedJoinConverter impleme
this.defaultParallelism = defaultParallelism;
}
- public Iterable<Tuple2<PartitionIndexedKey, Tuple>> call(Tuple tuple) throws Exception {
+ @Override
+ public Iterator<Tuple2<PartitionIndexedKey, Tuple>> call(Tuple tuple) throws Exception {
if (!initialized) {
Integer[] reducers = new Integer[1];
reducerMap = loadKeyDistribution(keyDist, reducers);
@@ -526,12 +527,12 @@ public class SkewedJoinConverter impleme
l.add(new Tuple2(pIndexKey, tuple));
}
- return l;
+ return l.iterator();
}
}
/**
- * user defined spark partitioner for skewed join
+ * User defined spark partitioner for skewed join
*/
private static class SkewedJoinPartitioner extends Partitioner {
private int numPartitions;
@@ -568,12 +569,8 @@ public class SkewedJoinConverter impleme
}
/**
- * use parallelism from keyDist or the default parallelism to
+ * Use parallelism from keyDist or the default parallelism to
* create user defined partitioner
- *
- * @param keyDist
- * @param defaultParallelism
- * @return
*/
private SkewedJoinPartitioner buildPartitioner(Broadcast<List<Tuple>> keyDist, Integer defaultParallelism) {
Integer parallelism = -1;
@@ -588,12 +585,7 @@ public class SkewedJoinConverter impleme
}
/**
- * do all kinds of Join (inner, left outer, right outer, full outer)
- *
- * @param skewIndexedJavaPairRDD
- * @param streamIndexedJavaPairRDD
- * @param partitioner
- * @return
+ * Do all kinds of Join (inner, left outer, right outer, full outer)
*/
private JavaRDD<Tuple> doJoin(
JavaPairRDD<PartitionIndexedKey, Tuple> skewIndexedJavaPairRDD,
@@ -616,25 +608,22 @@ public class SkewedJoinConverter impleme
JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Tuple>> resultKeyValue = skewIndexedJavaPairRDD.
join(streamIndexedJavaPairRDD, partitioner);
- return resultKeyValue.mapPartitions(toValueFun);
+ return resultKeyValue.mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
} else if (innerFlags[0] && !innerFlags[1]) {
// left outer join
- JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD.
- leftOuterJoin(streamIndexedJavaPairRDD, partitioner);
-
- return resultKeyValue.mapPartitions(toValueFun);
+ return skewIndexedJavaPairRDD
+ .leftOuterJoin(streamIndexedJavaPairRDD, partitioner)
+ .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
} else if (!innerFlags[0] && innerFlags[1]) {
// right outer join
- JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Tuple>> resultKeyValue = skewIndexedJavaPairRDD.
- rightOuterJoin(streamIndexedJavaPairRDD, partitioner);
-
- return resultKeyValue.mapPartitions(toValueFun);
+ return skewIndexedJavaPairRDD
+ .rightOuterJoin(streamIndexedJavaPairRDD, partitioner)
+ .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
} else {
// full outer join
- JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD.
- fullOuterJoin(streamIndexedJavaPairRDD, partitioner);
-
- return resultKeyValue.mapPartitions(toValueFun);
+ return skewIndexedJavaPairRDD
+ .fullOuterJoin(streamIndexedJavaPairRDD, partitioner)
+ .mapPartitions(SparkShims.getInstance().flatMapFunction(toValueFun));
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java Wed Jul 19 01:32:41 2017
@@ -22,25 +22,26 @@ import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.backend.hadoop.executionengine.spark.FlatMapFunctionAdapter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
@SuppressWarnings("serial")
public class SortConverter implements RDDConverter<Tuple, Tuple, POSort> {
private static final Log LOG = LogFactory.getLog(SortConverter.class);
- private static final FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple> TO_VALUE_FUNCTION = new ToValueFunction();
+ private static final FlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, Tuple> TO_VALUE_FUNCTION = new ToValueFunction();
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POSort sortOperator)
@@ -57,13 +58,13 @@ public class SortConverter implements RD
JavaPairRDD<Tuple, Object> sorted = r.sortByKey(
sortOperator.getMComparator(), true, parallelism);
- JavaRDD<Tuple> mapped = sorted.mapPartitions(TO_VALUE_FUNCTION);
+ JavaRDD<Tuple> mapped = sorted.mapPartitions(SparkShims.getInstance().flatMapFunction(TO_VALUE_FUNCTION));
return mapped.rdd();
}
private static class ToValueFunction implements
- FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable {
+ FlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable {
private class Tuple2TransformIterable implements Iterable<Tuple> {
@@ -84,8 +85,8 @@ public class SortConverter implements RD
}
@Override
- public Iterable<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) {
- return new Tuple2TransformIterable(input);
+ public Iterator<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) {
+ return new Tuple2TransformIterable(input).iterator();
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java Wed Jul 19 01:32:41 2017
@@ -19,18 +19,18 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.backend.hadoop.executionengine.spark.PairFlatMapFunctionAdapter;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POSampleSortSpark;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -40,12 +40,11 @@ import org.apache.pig.data.TupleFactory;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.rdd.RDD;
- /*
- sort the sample data and convert the sample data to the format (all,{(sampleEle1),(sampleEle2),...})
- */
+/*
+ sort the sample data and convert the sample data to the format (all,{(sampleEle1),(sampleEle2),...})
+ */
@SuppressWarnings("serial")
public class SparkSampleSortConverter implements RDDConverter<Tuple, Tuple, POSampleSortSpark> {
private static final Log LOG = LogFactory.getLog(SparkSampleSortConverter.class);
@@ -66,14 +65,14 @@ public class SparkSampleSortConverter im
//sort sample data
JavaPairRDD<Tuple, Object> sorted = r.sortByKey(true);
//convert every element in sample data from element to (all, element) format
- JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(new AggregateFunction());
+ JavaPairRDD<String, Tuple> mapped = sorted.mapPartitionsToPair(SparkShims.getInstance().pairFlatMapFunction(new AggregateFunction()));
//use groupByKey to aggregate all values( the format will be ((all),{(sampleEle1),(sampleEle2),...} )
JavaRDD<Tuple> groupByKey= mapped.groupByKey().map(new ToValueFunction());
return groupByKey.rdd();
}
- private static class MergeFunction implements org.apache.spark.api.java.function.Function2<Tuple, Tuple, Tuple>
+ private static class MergeFunction implements Function2<Tuple, Tuple, Tuple>
, Serializable {
@Override
@@ -89,7 +88,7 @@ public class SparkSampleSortConverter im
// input: Tuple2<Tuple,Object>
// output: Tuple2("all", Tuple)
private static class AggregateFunction implements
- PairFlatMapFunction<Iterator<Tuple2<Tuple, Object>>, String,Tuple>, Serializable {
+ PairFlatMapFunctionAdapter<Iterator<Tuple2<Tuple, Object>>, String,Tuple>, Serializable {
private class Tuple2TransformIterable implements Iterable<Tuple2<String,Tuple>> {
@@ -111,8 +110,8 @@ public class SparkSampleSortConverter im
}
@Override
- public Iterable<Tuple2<String, Tuple>> call(Iterator<Tuple2<Tuple, Object>> input) throws Exception {
- return new Tuple2TransformIterable(input);
+ public Iterator<Tuple2<String, Tuple>> call(Iterator<Tuple2<Tuple, Object>> input) throws Exception {
+ return new Tuple2TransformIterable(input).iterator();
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java Wed Jul 19 01:32:41 2017
@@ -25,9 +25,8 @@ import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.*;
import org.apache.pig.data.Tuple;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
public class StreamConverter implements
@@ -35,44 +34,40 @@ public class StreamConverter implements
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
- POStream poStream) throws IOException {
+ POStream poStream) throws IOException {
SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
RDD<Tuple> rdd = predecessors.get(0);
StreamFunction streamFunction = new StreamFunction(poStream);
- return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd();
+ return rdd.toJavaRDD().mapPartitions(SparkShims.getInstance().flatMapFunction(streamFunction), true).rdd();
}
private static class StreamFunction implements
- FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+ FlatMapFunctionAdapter<Iterator<Tuple>, Tuple>, Serializable {
private POStream poStream;
private StreamFunction(POStream poStream) {
this.poStream = poStream;
}
- public Iterable<Tuple> call(final Iterator<Tuple> input) {
- return new Iterable<Tuple>() {
+ @Override
+ public Iterator<Tuple> call(final Iterator<Tuple> input) {
+ return new OutputConsumerIterator(input) {
+
+ @Override
+ protected void attach(Tuple tuple) {
+ poStream.setInputs(null);
+ poStream.attachInput(tuple);
+ }
+
@Override
- public Iterator<Tuple> iterator() {
- return new OutputConsumerIterator(input) {
+ protected Result getNextResult() throws ExecException {
+ Result result = poStream.getNextTuple();
+ return result;
+ }
- @Override
- protected void attach(Tuple tuple) {
- poStream.setInputs(null);
- poStream.attachInput(tuple);
- }
-
- @Override
- protected Result getNextResult() throws ExecException {
- Result result = poStream.getNextTuple();
- return result;
- }
-
- @Override
- protected void endOfInput() {
- poStream.setFetchable(true);
- }
- };
+ @Override
+ protected void endOfInput() {
+ poStream.setFetchable(true);
}
};
}
Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java?rev=1802347&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark1JobStats.java Wed Jul 19 01:32:41 2017
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.spark;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.executor.TaskMetrics;
+import scala.Option;
+
+import java.util.List;
+import java.util.Map;
+
+public class Spark1JobStats extends SparkJobStats {
+ public Spark1JobStats(int jobId, PigStats.JobGraph plan, Configuration conf) {
+ super(jobId, plan, conf);
+ }
+
+ public Spark1JobStats(String jobId, PigStats.JobGraph plan, Configuration conf) {
+ super(jobId, plan, conf);
+ }
+
+ @Override
+ protected Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) {
+ Map<String, Long> results = Maps.newLinkedHashMap();
+
+ long executorDeserializeTime = 0;
+ long executorRunTime = 0;
+ long resultSize = 0;
+ long jvmGCTime = 0;
+ long resultSerializationTime = 0;
+ long memoryBytesSpilled = 0;
+ long diskBytesSpilled = 0;
+ long bytesRead = 0;
+ long bytesWritten = 0;
+ long remoteBlocksFetched = 0;
+ long localBlocksFetched = 0;
+ long fetchWaitTime = 0;
+ long remoteBytesRead = 0;
+ long shuffleBytesWritten = 0;
+ long shuffleWriteTime = 0;
+ boolean inputMetricExist = false;
+ boolean outputMetricExist = false;
+ boolean shuffleReadMetricExist = false;
+ boolean shuffleWriteMetricExist = false;
+
+ for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+ if (stageMetric != null) {
+ for (TaskMetrics taskMetrics : stageMetric) {
+ if (taskMetrics != null) {
+ executorDeserializeTime += taskMetrics.executorDeserializeTime();
+ executorRunTime += taskMetrics.executorRunTime();
+ resultSize += taskMetrics.resultSize();
+ jvmGCTime += taskMetrics.jvmGCTime();
+ resultSerializationTime += taskMetrics.resultSerializationTime();
+ memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+ diskBytesSpilled += taskMetrics.diskBytesSpilled();
+ if (!taskMetrics.inputMetrics().isEmpty()) {
+ inputMetricExist = true;
+ bytesRead += taskMetrics.inputMetrics().get().bytesRead();
+ }
+
+ if (!taskMetrics.outputMetrics().isEmpty()) {
+ outputMetricExist = true;
+ bytesWritten += taskMetrics.outputMetrics().get().bytesWritten();
+ }
+
+ Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+ if (!shuffleReadMetricsOption.isEmpty()) {
+ shuffleReadMetricExist = true;
+ remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
+ localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
+ fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
+ remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
+ }
+
+ Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+ if (!shuffleWriteMetricsOption.isEmpty()) {
+ shuffleWriteMetricExist = true;
+ shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
+ shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+ }
+
+ }
+ }
+ }
+ }
+
+ results.put("ExcutorDeserializeTime", executorDeserializeTime);
+ results.put("ExecutorRunTime", executorRunTime);
+ results.put("ResultSize", resultSize);
+ results.put("JvmGCTime", jvmGCTime);
+ results.put("ResultSerializationTime", resultSerializationTime);
+ results.put("MemoryBytesSpilled", memoryBytesSpilled);
+ results.put("DiskBytesSpilled", diskBytesSpilled);
+ if (inputMetricExist) {
+ results.put("BytesRead", bytesRead);
+ hdfsBytesRead = bytesRead;
+ counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
+ }
+
+ if (outputMetricExist) {
+ results.put("BytesWritten", bytesWritten);
+ hdfsBytesWritten = bytesWritten;
+ counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
+ }
+
+ if (shuffleReadMetricExist) {
+ results.put("RemoteBlocksFetched", remoteBlocksFetched);
+ results.put("LocalBlocksFetched", localBlocksFetched);
+ results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
+ results.put("FetchWaitTime", fetchWaitTime);
+ results.put("RemoteBytesRead", remoteBytesRead);
+ }
+
+ if (shuffleWriteMetricExist) {
+ results.put("ShuffleBytesWritten", shuffleBytesWritten);
+ results.put("ShuffleWriteTime", shuffleWriteTime);
+ }
+
+ return results;
+ }
+}
Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java?rev=1802347&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/Spark2JobStats.java Wed Jul 19 01:32:41 2017
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.spark;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.executor.TaskMetrics;
+
+import java.util.List;
+import java.util.Map;
+
+public class Spark2JobStats extends SparkJobStats {
+ public Spark2JobStats(int jobId, PigStats.JobGraph plan, Configuration conf) {
+ super(jobId, plan, conf);
+ }
+
+ public Spark2JobStats(String jobId, PigStats.JobGraph plan, Configuration conf) {
+ super(jobId, plan, conf);
+ }
+
+ @Override
+ protected Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) {
+ Map<String, Long> results = Maps.newLinkedHashMap();
+
+ long executorDeserializeTime = 0;
+ long executorRunTime = 0;
+ long resultSize = 0;
+ long jvmGCTime = 0;
+ long resultSerializationTime = 0;
+ long memoryBytesSpilled = 0;
+ long diskBytesSpilled = 0;
+ long bytesRead = 0;
+ long bytesWritten = 0;
+ long remoteBlocksFetched = 0;
+ long localBlocksFetched = 0;
+ long fetchWaitTime = 0;
+ long remoteBytesRead = 0;
+ long shuffleBytesWritten = 0;
+ long shuffleWriteTime = 0;
+
+ for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+ if (stageMetric != null) {
+ for (TaskMetrics taskMetrics : stageMetric) {
+ if (taskMetrics != null) {
+ executorDeserializeTime += taskMetrics.executorDeserializeTime();
+ executorRunTime += taskMetrics.executorRunTime();
+ resultSize += taskMetrics.resultSize();
+ jvmGCTime += taskMetrics.jvmGCTime();
+ resultSerializationTime += taskMetrics.resultSerializationTime();
+ memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+ diskBytesSpilled += taskMetrics.diskBytesSpilled();
+ bytesRead += taskMetrics.inputMetrics().bytesRead();
+
+ bytesWritten += taskMetrics.outputMetrics().bytesWritten();
+
+ ShuffleReadMetrics shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+ remoteBlocksFetched += shuffleReadMetricsOption.remoteBlocksFetched();
+ localBlocksFetched += shuffleReadMetricsOption.localBlocksFetched();
+ fetchWaitTime += shuffleReadMetricsOption.fetchWaitTime();
+ remoteBytesRead += shuffleReadMetricsOption.remoteBytesRead();
+
+ ShuffleWriteMetrics shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+ shuffleBytesWritten += shuffleWriteMetricsOption.shuffleBytesWritten();
+ shuffleWriteTime += shuffleWriteMetricsOption.shuffleWriteTime();
+ }
+ }
+ }
+ }
+
+ results.put("ExcutorDeserializeTime", executorDeserializeTime);
+ results.put("ExecutorRunTime", executorRunTime);
+ results.put("ResultSize", resultSize);
+ results.put("JvmGCTime", jvmGCTime);
+ results.put("ResultSerializationTime", resultSerializationTime);
+ results.put("MemoryBytesSpilled", memoryBytesSpilled);
+ results.put("DiskBytesSpilled", diskBytesSpilled);
+
+ results.put("BytesRead", bytesRead);
+ hdfsBytesRead = bytesRead;
+ counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
+
+ results.put("BytesWritten", bytesWritten);
+ hdfsBytesWritten = bytesWritten;
+ counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
+
+ results.put("RemoteBlocksFetched", remoteBlocksFetched);
+ results.put("LocalBlocksFetched", localBlocksFetched);
+ results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
+ results.put("FetchWaitTime", fetchWaitTime);
+ results.put("RemoteBytesRead", remoteBytesRead);
+
+ results.put("ShuffleBytesWritten", shuffleBytesWritten);
+ results.put("ShuffleWriteTime", shuffleWriteTime);
+
+ return results;
+ }
+}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Wed Jul 19 01:32:41 2017
@@ -21,30 +21,30 @@ package org.apache.pig.tools.pigstats.sp
import java.util.List;
import java.util.Map;
-import org.apache.pig.tools.pigstats.*;
-import scala.Option;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.PlanVisitor;
-import org.apache.spark.executor.ShuffleReadMetrics;
-import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.pig.tools.pigstats.InputStats;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.spark.executor.TaskMetrics;
import com.google.common.collect.Maps;
-public class SparkJobStats extends JobStats {
+public abstract class SparkJobStats extends JobStats {
private int jobId;
private Map<String, Long> stats = Maps.newLinkedHashMap();
private boolean disableCounter;
- private Counters counters = null;
+ protected Counters counters = null;
public static String FS_COUNTER_GROUP = "FS_GROUP";
private Map<String, SparkCounter<Map<String, Long>>> warningCounters = null;
@@ -58,6 +58,7 @@ public class SparkJobStats extends JobSt
setConf(conf);
}
+ @Override
public void setConf(Configuration conf) {
super.setConf(conf);
disableCounter = conf.getBoolean("pig.disable.counter", false);
@@ -65,7 +66,7 @@ public class SparkJobStats extends JobSt
}
public void addOutputInfo(POStore poStore, boolean success,
- JobMetricsListener jobMetricsListener) {
+ JobStatisticCollector jobStatisticCollector) {
if (!poStore.isTmpStore()) {
long bytes = getOutputSize(poStore, conf);
long recordsCount = -1;
@@ -99,9 +100,9 @@ public class SparkJobStats extends JobSt
inputs.add(inputStats);
}
- public void collectStats(JobMetricsListener jobMetricsListener) {
- if (jobMetricsListener != null) {
- Map<String, List<TaskMetrics>> taskMetrics = jobMetricsListener.getJobMetric(jobId);
+ public void collectStats(JobStatisticCollector jobStatisticCollector) {
+ if (jobStatisticCollector != null) {
+ Map<String, List<TaskMetrics>> taskMetrics = jobStatisticCollector.getJobMetric(jobId);
if (taskMetrics == null) {
throw new RuntimeException("No task metrics available for jobId " + jobId);
}
@@ -109,110 +110,12 @@ public class SparkJobStats extends JobSt
}
}
+ protected abstract Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric);
+
public Map<String, Long> getStats() {
return stats;
}
- private Map<String, Long> combineTaskMetrics(Map<String, List<TaskMetrics>> jobMetric) {
- Map<String, Long> results = Maps.newLinkedHashMap();
-
- long executorDeserializeTime = 0;
- long executorRunTime = 0;
- long resultSize = 0;
- long jvmGCTime = 0;
- long resultSerializationTime = 0;
- long memoryBytesSpilled = 0;
- long diskBytesSpilled = 0;
- long bytesRead = 0;
- long bytesWritten = 0;
- long remoteBlocksFetched = 0;
- long localBlocksFetched = 0;
- long fetchWaitTime = 0;
- long remoteBytesRead = 0;
- long shuffleBytesWritten = 0;
- long shuffleWriteTime = 0;
- boolean inputMetricExist = false;
- boolean outputMetricExist = false;
- boolean shuffleReadMetricExist = false;
- boolean shuffleWriteMetricExist = false;
-
- for (List<TaskMetrics> stageMetric : jobMetric.values()) {
- if (stageMetric != null) {
- for (TaskMetrics taskMetrics : stageMetric) {
- if (taskMetrics != null) {
- executorDeserializeTime += taskMetrics.executorDeserializeTime();
- executorRunTime += taskMetrics.executorRunTime();
- resultSize += taskMetrics.resultSize();
- jvmGCTime += taskMetrics.jvmGCTime();
- resultSerializationTime += taskMetrics.resultSerializationTime();
- memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
- diskBytesSpilled += taskMetrics.diskBytesSpilled();
- if (!taskMetrics.inputMetrics().isEmpty()) {
- inputMetricExist = true;
- bytesRead += taskMetrics.inputMetrics().get().bytesRead();
- }
-
- if (!taskMetrics.outputMetrics().isEmpty()) {
- outputMetricExist = true;
- bytesWritten += taskMetrics.outputMetrics().get().bytesWritten();
- }
-
- Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
- if (!shuffleReadMetricsOption.isEmpty()) {
- shuffleReadMetricExist = true;
- remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
- localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
- fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
- remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
- }
-
- Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
- if (!shuffleWriteMetricsOption.isEmpty()) {
- shuffleWriteMetricExist = true;
- shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
- shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
- }
-
- }
- }
- }
- }
-
- results.put("EexcutorDeserializeTime", executorDeserializeTime);
- results.put("ExecutorRunTime", executorRunTime);
- results.put("ResultSize", resultSize);
- results.put("JvmGCTime", jvmGCTime);
- results.put("ResultSerializationTime", resultSerializationTime);
- results.put("MemoryBytesSpilled", memoryBytesSpilled);
- results.put("DiskBytesSpilled", diskBytesSpilled);
- if (inputMetricExist) {
- results.put("BytesRead", bytesRead);
- hdfsBytesRead = bytesRead;
- counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
- }
-
- if (outputMetricExist) {
- results.put("BytesWritten", bytesWritten);
- hdfsBytesWritten = bytesWritten;
- counters.incrCounter(FS_COUNTER_GROUP, PigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
- }
-
- if (shuffleReadMetricExist) {
- results.put("RemoteBlocksFetched", remoteBlocksFetched);
- results.put("LocalBlocksFetched", localBlocksFetched);
- results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
- results.put("FetchWaitTime", fetchWaitTime);
- results.put("RemoteBytesRead", remoteBytesRead);
- }
-
- if (shuffleWriteMetricExist) {
- results.put("ShuffleBytesWritten", shuffleBytesWritten);
- results.put("ShuffleWriteTime", shuffleWriteTime);
- }
-
- return results;
- }
-
@Override
public String getJobId() {
return String.valueOf(jobId);
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Jul 19 01:32:41 2017
@@ -32,7 +32,8 @@ import org.apache.pig.PigWarning;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkShims;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -69,14 +70,14 @@ public class SparkPigStats extends PigSt
}
public void addJobStats(POStore poStore, SparkOperator sparkOperator, int jobId,
- JobMetricsListener jobMetricsListener,
+ JobStatisticCollector jobStatisticCollector,
JavaSparkContext sparkContext) {
boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext);
- SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
+ SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf);
jobStats.setSuccessful(isSuccess);
- jobStats.collectStats(jobMetricsListener);
- jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener);
- addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf);
+ jobStats.collectStats(jobStatisticCollector);
+ jobStats.addOutputInfo(poStore, isSuccess, jobStatisticCollector);
+ addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobStatisticCollector, conf);
jobStats.initWarningCounters();
jobSparkOperatorMap.put(jobStats, sparkOperator);
@@ -85,22 +86,22 @@ public class SparkPigStats extends PigSt
public void addFailJobStats(POStore poStore, SparkOperator sparkOperator, String jobId,
- JobMetricsListener jobMetricsListener,
+ JobStatisticCollector jobStatisticCollector,
JavaSparkContext sparkContext,
Exception e) {
boolean isSuccess = false;
- SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
+ SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf);
jobStats.setSuccessful(isSuccess);
- jobStats.collectStats(jobMetricsListener);
- jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener);
- addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf);
+ jobStats.collectStats(jobStatisticCollector);
+ jobStats.addOutputInfo(poStore, isSuccess, jobStatisticCollector);
+ addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobStatisticCollector, conf);
jobSparkOperatorMap.put(jobStats, sparkOperator);
jobPlan.add(jobStats);
jobStats.setBackendException(e);
}
public void addNativeJobStats(NativeSparkOperator sparkOperator, String jobId, boolean isSuccess, Exception e) {
- SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan, conf);
+ SparkJobStats jobStats = SparkShims.getInstance().sparkJobStats(jobId, jobPlan, conf);
jobStats.setSuccessful(isSuccess);
jobSparkOperatorMap.put(jobStats, sparkOperator);
jobPlan.add(jobStats);
@@ -229,7 +230,7 @@ public class SparkPigStats extends PigSt
private void addInputInfoForSparkOper(SparkOperator sparkOperator,
SparkJobStats jobStats,
boolean isSuccess,
- JobMetricsListener jobMetricsListener,
+ JobStatisticCollector jobStatisticCollector,
Configuration conf) {
//to avoid repetition
if (sparkOperatorsSet.contains(sparkOperator)) {
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Jul 19 01:32:41 2017
@@ -26,7 +26,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.JobGraphBuilder;
-import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobStatisticCollector;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.tools.pigstats.PigStatsUtil;
@@ -44,7 +44,7 @@ public class SparkStatsUtil {
public static void waitForJobAddStats(int jobID,
POStore poStore, SparkOperator sparkOperator,
- JobMetricsListener jobMetricsListener,
+ JobStatisticCollector jobStatisticCollector,
JavaSparkContext sparkContext,
SparkPigStats sparkPigStats)
throws InterruptedException {
@@ -55,20 +55,17 @@ public class SparkStatsUtil {
// "event bus" thread updating it's internal listener and
// this driver thread calling SparkStatusTracker.
// To workaround this, we will wait for this job to "finish".
- jobMetricsListener.waitForJobToEnd(jobID);
- sparkPigStats.addJobStats(poStore, sparkOperator, jobID, jobMetricsListener,
+ jobStatisticCollector.waitForJobToEnd(jobID);
+ sparkPigStats.addJobStats(poStore, sparkOperator, jobID, jobStatisticCollector,
sparkContext);
- jobMetricsListener.cleanup(jobID);
+ jobStatisticCollector.cleanup(jobID);
}
public static void addFailJobStats(String jobID,
POStore poStore, SparkOperator sparkOperator,
SparkPigStats sparkPigStats,
Exception e) {
- JobMetricsListener jobMetricsListener = null;
- JavaSparkContext sparkContext = null;
- sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID, jobMetricsListener,
- sparkContext, e);
+ sparkPigStats.addFailJobStats(poStore, sparkOperator, jobID, null, null, e);
}
public static String getCounterName(POStore store) {
Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1802347&r1=1802346&r2=1802347&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Wed Jul 19 01:32:41 2017
@@ -60,7 +60,6 @@ import org.apache.pig.tools.pigstats.Pig
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
-import org.apache.pig.tools.pigstats.spark.SparkJobStats;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;