You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/11/04 03:46:06 UTC
svn commit: r1767986 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
test/org/apache/pig/test/TestBuiltin.java
Author: xuefu
Date: Fri Nov 4 03:46:06 2016
New Revision: 1767986
URL: http://svn.apache.org/viewvc?rev=1767986&view=rev
Log:
PIG-5051: Initialize PigContants.TASK_INDEX in spark mode correctly (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1767986&r1=1767985&r2=1767986&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java Fri Nov 4 03:46:06 2016
@@ -94,7 +94,6 @@ public class SparkUtil {
jobConf.set(PigConstants.LOCAL_CODE_DIR,
System.getProperty("java.io.tmpdir"));
jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
- jobConf.set(PigConstants.TASK_INDEX, "0");
LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
physicalPlan, POStore.class);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1767986&r1=1767985&r2=1767986&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Fri Nov 4 03:46:06 2016
@@ -25,7 +25,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.pig.PigConstants;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf;
import org.apache.pig.impl.util.UDFContext;
@@ -53,6 +55,7 @@ import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.spark.SparkContext;
+import org.apache.spark.TaskContext;
import org.apache.spark.rdd.RDD;
import com.google.common.collect.Lists;
@@ -157,6 +160,7 @@ public class LoadConverter implements RD
private SparkCounters sparkCounters;
private boolean disableCounter;
private SparkEngineConf sparkEngineConf;
+ private boolean initialized;
public ToTupleFunction(SparkEngineConf sparkEngineConf){
this.sparkEngineConf = sparkEngineConf;
@@ -165,6 +169,11 @@ public class LoadConverter implements RD
@Override
public Tuple apply(Tuple2<Text, Tuple> v1) {
+ if (!initialized) {
+ long partitionId = TaskContext.get().partitionId();
+ PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId));
+ initialized = true;
+ }
if (sparkCounters != null && disableCounter == false) {
sparkCounters.increment(counterGroupName, counterName, 1L);
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1767986&r1=1767985&r2=1767986&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Fri Nov 4 03:46:06 2016
@@ -3229,8 +3229,6 @@ public class TestBuiltin {
assertEquals(iter.next().get(1), "1-3");
assertEquals(iter.next().get(1), "1-4");
} else{
- //because we set PigConstants.TASK_INDEX as 0 in ForEachConverter#ForEachFunction#initializeJobConf
- //UniqueID.exec() will output like 0-*
//there will be 2 InputSplits when mapred.max.split.size is 10(byte) for the testUniqueID.txt(20 bytes)
//Split0:
// 1\n
@@ -3252,10 +3250,10 @@ public class TestBuiltin {
assertEquals(iter.next().get(1), "0-3");
assertEquals(iter.next().get(1), "0-4");
assertEquals(iter.next().get(1), "0-5");
- assertEquals(iter.next().get(1), "0-0");
- assertEquals(iter.next().get(1), "0-1");
- assertEquals(iter.next().get(1), "0-2");
- assertEquals(iter.next().get(1), "0-3");
+ assertEquals(iter.next().get(1), "1-0");
+ assertEquals(iter.next().get(1), "1-1");
+ assertEquals(iter.next().get(1), "1-2");
+ assertEquals(iter.next().get(1), "1-3");
}
Util.deleteFile(cluster, inputFileName);
}