You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/11/04 03:46:06 UTC

svn commit: r1767986 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java test/org/apache/pig/test/TestBuiltin.java

Author: xuefu
Date: Fri Nov  4 03:46:06 2016
New Revision: 1767986

URL: http://svn.apache.org/viewvc?rev=1767986&view=rev
Log:
PIG-5051: Initialize PigContants.TASK_INDEX in spark mode correctly (Liyun via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1767986&r1=1767985&r2=1767986&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java Fri Nov  4 03:46:06 2016
@@ -94,7 +94,6 @@ public class SparkUtil {
         jobConf.set(PigConstants.LOCAL_CODE_DIR,
                 System.getProperty("java.io.tmpdir"));
         jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
-        jobConf.set(PigConstants.TASK_INDEX, "0");
 
         LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
                 physicalPlan, POStore.class);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1767986&r1=1767985&r2=1767986&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Fri Nov  4 03:46:06 2016
@@ -25,7 +25,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.PigConstants;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf;
 import org.apache.pig.impl.util.UDFContext;
@@ -53,6 +55,7 @@ import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.spark.SparkContext;
+import org.apache.spark.TaskContext;
 import org.apache.spark.rdd.RDD;
 
 import com.google.common.collect.Lists;
@@ -157,6 +160,7 @@ public class LoadConverter implements RD
         private SparkCounters sparkCounters;
         private boolean disableCounter;
         private SparkEngineConf sparkEngineConf;
+        private boolean initialized;
 
         public ToTupleFunction(SparkEngineConf sparkEngineConf){
                this.sparkEngineConf = sparkEngineConf;
@@ -165,6 +169,11 @@ public class LoadConverter implements RD
 
         @Override
         public Tuple apply(Tuple2<Text, Tuple> v1) {
+            if (!initialized) {
+                long partitionId = TaskContext.get().partitionId();
+                PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId));
+                initialized = true;
+            }
             if (sparkCounters != null && disableCounter == false) {
                 sparkCounters.increment(counterGroupName, counterName, 1L);
             }

Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1767986&r1=1767985&r2=1767986&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Fri Nov  4 03:46:06 2016
@@ -3229,8 +3229,6 @@ public class TestBuiltin {
             assertEquals(iter.next().get(1), "1-3");
             assertEquals(iter.next().get(1), "1-4");
         } else{
-            //because we set PigConstants.TASK_INDEX as 0 in ForEachConverter#ForEachFunction#initializeJobConf
-            //UniqueID.exec() will output like 0-*
             //there will be 2 InputSplits when mapred.max.split.size is 10(byte) for the testUniqueID.txt(20 bytes)
             //Split0:
             //            1\n
@@ -3252,10 +3250,10 @@ public class TestBuiltin {
             assertEquals(iter.next().get(1), "0-3");
             assertEquals(iter.next().get(1), "0-4");
             assertEquals(iter.next().get(1), "0-5");
-            assertEquals(iter.next().get(1), "0-0");
-            assertEquals(iter.next().get(1), "0-1");
-            assertEquals(iter.next().get(1), "0-2");
-            assertEquals(iter.next().get(1), "0-3");
+            assertEquals(iter.next().get(1), "1-0");
+            assertEquals(iter.next().get(1), "1-1");
+            assertEquals(iter.next().get(1), "1-2");
+            assertEquals(iter.next().get(1), "1-3");
         }
         Util.deleteFile(cluster, inputFileName);
     }