You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/10/17 15:17:34 UTC

svn commit: r1765307 - in /pig/branches/branch-0.16: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/ src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/pe...

Author: rohini
Date: Mon Oct 17 15:17:34 2016
New Revision: 1765307

URL: http://svn.apache.org/viewvc?rev=1765307&view=rev
Log:
PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of Random (rohini)

Modified:
    pig/branches/branch-0.16/CHANGES.txt
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
    pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
    pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/GFCross.java
    pig/branches/branch-0.16/src/org/apache/pig/pen/LocalMapReduceSimulator.java
    pig/branches/branch-0.16/test/org/apache/pig/test/TestFindQuantiles.java
    pig/branches/branch-0.16/test/org/apache/pig/test/TestGFCross.java

Modified: pig/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1765307&r1=1765306&r2=1765307&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Mon Oct 17 15:17:34 2016
@@ -30,6 +30,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of Random (rohini)
+
 PIG-4951: Rename PIG_ATS_ENABLED constant (szita via daijy)
 
 PIG-5035: killJob API does not work in Tez (zjffdu via rohini)

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1765307&r1=1765306&r2=1765307&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Mon Oct 17 15:17:34 2016
@@ -26,21 +26,21 @@ public class DiscreteProbabilitySampleGe
     Random rGen;
     float[] probVec;
     float epsilon = 0.0001f;
-        
+
     private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class);
-    
-    public DiscreteProbabilitySampleGenerator(float[] probVec) {
-        rGen = new Random();
+
+    public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) {
+        rGen = new Random(seed);
         float sum = 0.0f;
         for (float f : probVec) {
             sum += f;
         }
         this.probVec = probVec;
-        if (1-epsilon > sum || sum > 1+epsilon) { 
+        if (1-epsilon > sum || sum > 1+epsilon) {
             LOG.info("Sum of probabilities should be near one: " + sum);
         }
     }
-    
+
     public int getNext(){
         double toss = rGen.nextDouble();
         // if the uniformly random number that I generated
@@ -57,13 +57,13 @@ public class DiscreteProbabilitySampleGe
             toss -= probVec[i];
             if(toss<=0.0)
                 return i;
-        }        
+        }
         return lastIdx;
     }
-    
+
     public static void main(String[] args) {
         float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f };
-        DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(vec);
+        DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec);
         CountingMap<Integer> cm = new CountingMap<Integer>();
         for(int i=0;i<100;i++){
             cm.put(gen.getNext(), 1);
@@ -75,6 +75,6 @@ public class DiscreteProbabilitySampleGe
     public String toString() {
         return Arrays.toString(probVec);
     }
-    
-    
+
+
 }

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1765307&r1=1765306&r2=1765307&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Mon Oct 17 15:17:34 2016
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
@@ -129,11 +130,13 @@ public class WeightedRangePartitioner ex
                 DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
                 InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
                 convertToArray(quantilesList);
+                long taskIdHashCode = job.get(MRConfiguration.TASK_ID).hashCode();
+                long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
                 for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
                     Tuple key = (Tuple)ent.getKey(); // sample item which repeats
                     float[] probVec = getProbVec((Tuple)ent.getValue());
                     weightedParts.put(getPigNullableWritable(key),
-                            new DiscreteProbabilitySampleGenerator(probVec));
+                            new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
                 }
             }
             // else - the quantiles file is empty - unless we have a bug, the

Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java?rev=1765307&r1=1765306&r2=1765307&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java Mon Oct 17 15:17:34 2016
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import org.apache.pig.data.DataBag;
@@ -30,6 +31,7 @@ import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 
 public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
@@ -64,11 +66,13 @@ public class WeightedRangePartitionerTez
             InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
             estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM);
             convertToArray(quantilesList);
+            long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode();
+            long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
             for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
                 Tuple key = (Tuple) ent.getKey(); // sample item which repeats
                 float[] probVec = getProbVec((Tuple) ent.getValue());
                 weightedParts.put(getPigNullableWritable(key),
-                        new DiscreteProbabilitySampleGenerator(probVec));
+                        new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
             }
         } catch (Exception e) {
             throw new RuntimeException(e);

Modified: pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/GFCross.java?rev=1765307&r1=1765306&r2=1765307&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/impl/builtin/GFCross.java Mon Oct 17 15:17:34 2016
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -42,7 +43,7 @@ public class GFCross extends EvalFunc<Da
     private BagFactory mBagFactory = BagFactory.getInstance();
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
     private int parallelism = 0;
-    private Random r = new Random();
+    private Random r;
     private String crossKey;
 
     static private final int DEFAULT_PARALLELISM = 96;
@@ -69,6 +70,14 @@ public class GFCross extends EvalFunc<Da
                 if (parallelism < 0) {
                     throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey  + " was " + parallelism);
                 }
+                long taskIdHashCode = cfg.get(MRConfiguration.TASK_ID).hashCode();
+                long seed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
+                r = new Random(seed);
+            } else {
+                // Don't see a case where cfg can be null.
+                // But there is an existing testcase TestGFCross.testDefault
+                // Using constant generated from task_14738102975522_0001_r_000000 hashcode
+                r = new Random(-4235927512599300514L);
             }
 
             numInputs = (Integer)input.get(0);

Modified: pig/branches/branch-0.16/src/org/apache/pig/pen/LocalMapReduceSimulator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1765307&r1=1765306&r2=1765307&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/pen/LocalMapReduceSimulator.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/pen/LocalMapReduceSimulator.java Mon Oct 17 15:17:34 2016
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -35,6 +36,7 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
@@ -75,9 +77,9 @@ import org.apache.pig.pen.util.LineageTr
  *
  */
 public class LocalMapReduceSimulator {
-    
+
     private MapReduceLauncher launcher = new MapReduceLauncher();
-    
+
     private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();;
 
     @SuppressWarnings("unchecked")
@@ -88,12 +90,12 @@ public class LocalMapReduceSimulator {
                               PigContext pc) throws PigException, IOException, InterruptedException {
         phyToMRMap.clear();
         MROperPlan mrp = launcher.compile(php, pc);
-                
+
         ConfigurationValidator.validatePigProperties(pc.getProperties());
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        
+
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-        
+
         JobControl jc;
         int numMRJobsCompl = 0;
         DataBag input;
@@ -106,6 +108,8 @@ public class LocalMapReduceSimulator {
         boolean needFileInput;
         final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>();
         pc.getProperties().setProperty("pig.illustrating", "true");
+        String jtIdentifier = "" + System.currentTimeMillis();
+        int jobId = 0;
         while(mrp.size() != 0) {
             jc = jcc.compile(mrp, "Illustrator");
             if(jc == null) {
@@ -113,6 +117,7 @@ public class LocalMapReduceSimulator {
             }
             List<Job> jobs = jc.getWaitingJobs();
             for (Job job : jobs) {
+                jobId++;
                 jobConf = job.getJobConf();
                 FileLocalizer.setInitialized(false);
                 ArrayList<ArrayList<OperatorKey>> inpTargets =
@@ -123,14 +128,14 @@ public class LocalMapReduceSimulator {
                 PigSplit split = null;
                 List<POStore> stores = null;
                 PhysicalOperator pack = null;
-                // revisit as there are new physical operators from MR compilation 
+                // revisit as there are new physical operators from MR compilation
                 if (!mro.mapPlan.isEmpty())
                     attacher.revisit(mro.mapPlan);
                 if (!mro.reducePlan.isEmpty()) {
                     attacher.revisit(mro.reducePlan);
                     pack = mro.reducePlan.getRoots().get(0);
                 }
-                
+
                 List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
                 if (!mro.mapPlan.isEmpty()) {
                     stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
@@ -145,10 +150,10 @@ public class LocalMapReduceSimulator {
                 for (POStore store : stores) {
                     output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store));
                 }
-               
+
                 OutputAttacher oa = new OutputAttacher(mro.mapPlan, output);
                 oa.visit();
-                
+
                 if (!mro.reducePlan.isEmpty()) {
                     oa = new OutputAttacher(mro.reducePlan, output);
                     oa.visit();
@@ -168,6 +173,7 @@ public class LocalMapReduceSimulator {
                     if (input != null)
                         mro.mapPlan.remove(ld);
                 }
+                int mapTaskId = 0;
                 for (POLoad ld : lds) {
                     // check newly generated data first
                     input = output.get(ld.getLFile().getFileName());
@@ -180,7 +186,7 @@ public class LocalMapReduceSimulator {
                                      break;
                                 }
                             }
-                        } 
+                        }
                     }
                     needFileInput = (input == null);
                     split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
@@ -199,6 +205,7 @@ public class LocalMapReduceSimulator {
                             context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split);
                         }
                         ((PigMapBase) map).setMapPlan(mro.mapPlan);
+                        context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
                         map.run(context);
                     } else {
                         if ("true".equals(jobConf.get("pig.usercomparator")))
@@ -210,10 +217,11 @@ public class LocalMapReduceSimulator {
                         Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map)
                           .getIllustratorContext(jobConf, input, intermediateData, split);
                         ((PigMapBase) map).setMapPlan(mro.mapPlan);
+                        context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
                         map.run(context);
                     }
                 }
-                
+
                 if (!mro.reducePlan.isEmpty())
                 {
                     if (pack instanceof POPackage)
@@ -233,19 +241,20 @@ public class LocalMapReduceSimulator {
                     }
 
                     ((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan);
+                    context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString());
                     reduce.run(context);
                 }
                 for (PhysicalOperator key : mro.phyToMRMap.keySet())
                     for (PhysicalOperator value : mro.phyToMRMap.get(key))
                         phyToMRMap.put(key, value);
             }
-            
-            
+
+
             int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>());
-            
+
             numMRJobsCompl += removedMROp;
         }
-                
+
         jcc.reset();
     }
 
@@ -256,7 +265,7 @@ public class LocalMapReduceSimulator {
                     plan));
             this.outputBuffer = output;
         }
-        
+
         @Override
         public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
             if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) {

Modified: pig/branches/branch-0.16/test/org/apache/pig/test/TestFindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/test/TestFindQuantiles.java?rev=1765307&r1=1765306&r2=1765307&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/test/TestFindQuantiles.java (original)
+++ pig/branches/branch-0.16/test/org/apache/pig/test/TestFindQuantiles.java Mon Oct 17 15:17:34 2016
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
@@ -38,10 +37,10 @@ import org.apache.pig.impl.builtin.FindQ
 import org.junit.Test;
 
 public class TestFindQuantiles {
-    
+
     private static TupleFactory tFact = TupleFactory.getInstance();
     private static final float epsilon = 0.0001f;
-    
+
     @Test
     public void testFindQuantiles() throws Exception {
        final int numSamples = 97778;
@@ -50,7 +49,7 @@ public class TestFindQuantiles {
        System.out.println("sum: " + sum);
        assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
     }
-    
+
     @Test
     public void testFindQuantiles2() throws Exception {
        final int numSamples = 30000;
@@ -86,7 +85,7 @@ public class TestFindQuantiles {
     }
 
     private float[] getProbVec(Tuple values) throws Exception {
-        float[] probVec = new float[values.size()];        
+        float[] probVec = new float[values.size()];
         for(int i = 0; i < values.size(); i++) {
             probVec[i] = (Float)values.get(i);
         }
@@ -95,7 +94,7 @@ public class TestFindQuantiles {
 
     private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception {
         Random rand = new Random(1000);
-        List<Tuple> samples = new ArrayList<Tuple>(); 
+        List<Tuple> samples = new ArrayList<Tuple>();
         for (int i=0; i<numSamples; i++) {
             Tuple t = tFact.newTuple(1);
             t.set(0, rand.nextInt(max));
@@ -106,7 +105,7 @@ public class TestFindQuantiles {
     }
 
     private DataBag generateUniqueSamples(int numSamples) throws Exception {
-        DataBag samples = BagFactory.getInstance().newDefaultBag(); 
+        DataBag samples = BagFactory.getInstance().newDefaultBag();
         for (int i=0; i<numSamples; i++) {
             Tuple t = tFact.newTuple(1);
             t.set(0, new Integer(23));
@@ -121,9 +120,9 @@ public class TestFindQuantiles {
 
         in.set(0, new Integer(numReduceres));
         in.set(1, samples);
-        
+
         FindQuantiles fq = new FindQuantiles();
-        
+
         Map<String, Object> res = fq.exec(in);
         return res;
     }
@@ -135,12 +134,11 @@ public class TestFindQuantiles {
         InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS);
         Iterator<Object> it = weightedPartsData.values().iterator();
         float[] probVec = getProbVec((Tuple)it.next());
-        new DiscreteProbabilitySampleGenerator(probVec);
         float sum = 0.0f;
         for (float f : probVec) {
             sum += f;
         }
         return sum;
     }
-    
+
 }

Modified: pig/branches/branch-0.16/test/org/apache/pig/test/TestGFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/test/TestGFCross.java?rev=1765307&r1=1765306&r2=1765307&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/test/TestGFCross.java (original)
+++ pig/branches/branch-0.16/test/org/apache/pig/test/TestGFCross.java Mon Oct 17 15:17:34 2016
@@ -20,6 +20,7 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -50,6 +51,7 @@ public class TestGFCross {
     public void testSerial() throws Exception {
         Configuration cfg = new Configuration();
         cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1");
+        cfg.set(MRConfiguration.TASK_ID, "task_1473802673416_1808_m_000000");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);
 
@@ -66,6 +68,7 @@ public class TestGFCross {
     public void testParallelSet() throws Exception {
         Configuration cfg = new Configuration();
         cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10");
+        cfg.set(MRConfiguration.TASK_ID, "task_14738102975522_0001_r_000000");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);