You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/03/01 08:56:54 UTC
svn commit: r1784882 -
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Author: zly
Date: Wed Mar 1 08:56:54 2017
New Revision: 1784882
URL: http://svn.apache.org/viewvc?rev=1784882&view=rev
Log:
PIG-5154:Fix GFCross related issues after merging from trunk to spark(Adam via Liyun)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1784882&r1=1784881&r2=1784882&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed Mar 1 08:56:54 2017
@@ -25,28 +25,26 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.pig.PigConstants;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.tools.pigstats.spark.SparkCounters;
-import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
-import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConstants;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark;
import org.apache.pig.data.Tuple;
@@ -54,6 +52,10 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
+import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
@@ -170,8 +172,9 @@ public class LoadConverter implements RD
public Tuple apply(Tuple2<Text, Tuple> v1) {
if (!initialized) {
long partitionId = TaskContext.get().partitionId();
- PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId));
-
+ Configuration jobConf = PigMapReduce.sJobConfInternal.get();
+ jobConf.set(PigConstants.TASK_INDEX, Long.toString(partitionId));
+ jobConf.set(MRConfiguration.TASK_ID, Long.toString(partitionId));
initialized = true;
}
if (sparkCounters != null && disableCounter == false) {