You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/02/18 03:14:12 UTC

svn commit: r1071864 - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer...

Author: daijy
Date: Fri Feb 18 02:14:11 2011
New Revision: 1071864

URL: http://svn.apache.org/viewvc?rev=1071864&view=rev
Log:
PIG-1831: Indeterministic behavior in local mode due to static variable PigMapReduce.sJobConf

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/LookupInFiles.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
    pig/trunk/src/org/apache/pig/builtin/Distinct.java
    pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
    pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
    pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
    pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
    pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    pig/trunk/test/org/apache/pig/test/TestFRJoin.java
    pig/trunk/test/org/apache/pig/test/TestFinish.java
    pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
    pig/trunk/test/org/apache/pig/test/utils/FILTERFROMFILE.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Feb 18 02:14:11 2011
@@ -299,6 +299,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1831: Indeterministic behavior in local mode due to static variable PigMapReduce.sJobConf (daijy)
+
 PIG-1841: TupleSize implemented incorrectly (laukik via daijy)
 
 PIG-1843: NPE in schema generation (daijy)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/LookupInFiles.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/LookupInFiles.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/LookupInFiles.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/LookupInFiles.java Fri Feb 18 02:14:11 2011
@@ -77,7 +77,7 @@ public class LookupInFiles extends EvalF
         }
         else
         {
-            Properties props = ConfigurationUtil.toProperties(PigMapReduce.sJobConf);
+            Properties props = ConfigurationUtil.toProperties(PigMapReduce.sJobConfInternal.get());
             for (int i = 0; i < mFiles.size(); ++i) {
                 // Files contain only 1 column with the key. No Schema. All keys
                 // separated by new line.

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Fri Feb 18 02:14:11 2011
@@ -107,7 +107,7 @@ public abstract class PigMapBase extends
             return;
         }
             
-        if(PigMapReduce.sJobConf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
+        if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
             // If there is a stream in the pipeline or if this map job belongs to merge-join we could 
             // potentially have more to process - so lets
             // set the flag stating that all map input has been sent
@@ -156,6 +156,7 @@ public abstract class PigMapBase extends
         Configuration job = context.getConfiguration();
         SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
         PigMapReduce.sJobContext = context;
+        PigMapReduce.sJobConfInternal.set(context.getConfiguration());
         PigMapReduce.sJobConf = context.getConfiguration();
         inIllustrator = (context instanceof IllustratorContext);
         

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Fri Feb 18 02:14:11 2011
@@ -97,10 +97,10 @@ public class PigMapReduce {
      * the job's {@link Configuration}:
      * <pre>UdfContext.getUdfContext().getJobConf()</pre>
      */
-    // This is used by internal pig code - it is deprecated for user code but is
-    // used by Pig internal code to set up UDFContext's conf among other things.
     @Deprecated
     public static Configuration sJobConf = null;
+    
+    public static final ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
     private final static Tuple DUMMYTUPLE = null;
     
     public static class Map extends PigMapBase {
@@ -310,6 +310,7 @@ public class PigMapReduce {
             Configuration jConf = context.getConfiguration();
             SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
             sJobContext = context;
+            sJobConfInternal.set(context.getConfiguration());
             sJobConf = context.getConfiguration();
             try {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
@@ -502,7 +503,7 @@ public class PigMapReduce {
                 return;
             }
             
-            if(PigMapReduce.sJobConf.get("pig.stream.in.reduce", "false").equals("true")) {
+            if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) {
                 // If there is a stream in the pipeline we could 
                 // potentially have more to process - so lets
                 // set the flag stating that all map input has been sent

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Fri Feb 18 02:14:11 2011
@@ -103,6 +103,7 @@ public class SkewedPartitioner extends P
     @Override
     public void setConf(Configuration job) {
         conf = job;
+        PigMapReduce.sJobConfInternal.set(conf);
         PigMapReduce.sJobConf = conf;
         String keyDistFile = job.get("pig.keyDistFile", "");
         if (keyDistFile.length() == 0)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Fri Feb 18 02:14:11 2011
@@ -176,8 +176,8 @@ public class POCollectedGroup extends Ph
             // the first time, just create a new buffer and continue.
             if (prevKey == null && outputBag == null) {
 
-                if (PigMapReduce.sJobConf != null) {
-                    String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+                if (PigMapReduce.sJobConfInternal.get() != null) {
+                    String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
                     if (bagType != null && bagType.equalsIgnoreCase("default")) {
                         useDefaultBag = true;
                     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Fri Feb 18 02:14:11 2011
@@ -119,8 +119,8 @@ public class POCombinerPackage extends P
 
     private DataBag createDataBag(int numBags) {
     	String bagType = null;
-        if (PigMapReduce.sJobConf != null) {
-   			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");       			
+        if (PigMapReduce.sJobConfInternal.get() != null) {
+   			bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");       			
    	    }
                 		          	           		
     	if (bagType != null && bagType.equalsIgnoreCase("default")) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Fri Feb 18 02:14:11 2011
@@ -89,8 +89,8 @@ public class PODistinct extends Physical
             // by default, we create InternalSortedBag, unless user configures
 			// explicitly to use old bag
            	String bagType = null;
-            if (PigMapReduce.sJobConf != null) {
-       			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.distinct.type");       			
+            if (PigMapReduce.sJobConfInternal.get() != null) {
+       			bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");       			
        	    }            
             if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
             	distinctBag = BagFactory.getInstance().newDistinctBag();    			

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java Fri Feb 18 02:14:11 2011
@@ -107,8 +107,8 @@ public class POJoinPackage extends POPac
         
         if(firstTime){
             firstTime = false;
-            if (PigMapReduce.sJobConf != null) {
-                String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+            if (PigMapReduce.sJobConfInternal.get() != null) {
+                String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
                 if (bagType != null && bagType.equalsIgnoreCase("default")) {
                     useDefaultBag = true;
                 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Fri Feb 18 02:14:11 2011
@@ -341,7 +341,7 @@ public class POMergeCogroup extends Phys
 
             LoadFunc loadfunc = (LoadFunc)PigContext.instantiateFuncFromSpec(sidFuncSpecs.get(i));
             loadfunc.setUDFContextSignature(loaderSignatures.get(i));
-            Job dummyJob = new Job(new Configuration(PigMapReduce.sJobConf));
+            Job dummyJob = new Job(new Configuration(PigMapReduce.sJobConfInternal.get()));
             loadfunc.setLocation(sideFileSpecs.get(i), dummyJob);
             ((IndexableLoadFunc)loadfunc).initialize(dummyJob.getConfiguration());
             sideLoaders.add(loadfunc);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Feb 18 02:14:11 2011
@@ -405,7 +405,7 @@ public class POMergeJoin extends Physica
         // Pass signature of the loader to rightLoader
         // make a copy of the conf to use in calls to rightLoader.
         rightLoader.setUDFContextSignature(signature);
-        Job job = new Job(new Configuration(PigMapReduce.sJobConf));
+        Job job = new Job(new Configuration(PigMapReduce.sJobConfInternal.get()));
         rightLoader.setLocation(rightInputFileName, job);
         ((IndexableLoadFunc)rightLoader).initialize(job.getConfiguration());
         ((IndexableLoadFunc)rightLoader).seekNear(

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Fri Feb 18 02:14:11 2011
@@ -220,8 +220,8 @@ public class POPackage extends PhysicalO
         
         if(firstTime){
             firstTime = false;
-            if (PigMapReduce.sJobConf != null) {
-                String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+            if (PigMapReduce.sJobConfInternal.get() != null) {
+                String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
                 if (bagType != null && bagType.equalsIgnoreCase("default")) {
                     useDefaultBag = true;
                 }
@@ -458,8 +458,8 @@ public class POPackage extends PhysicalO
         @SuppressWarnings("unchecked")
         public POPackageTupleBuffer() {    		
             batchSize = 20000;
-            if (PigMapReduce.sJobConf != null) {
-                String size = PigMapReduce.sJobConf.get("pig.accumulative.batchsize");
+            if (PigMapReduce.sJobConfInternal.get() != null) {
+                String size = PigMapReduce.sJobConfInternal.get().get("pig.accumulative.batchsize");
                 if (size != null) {
                     batchSize = Integer.parseInt(size);
                 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Fri Feb 18 02:14:11 2011
@@ -81,7 +81,7 @@ public class POPartitionRearrange extend
 
     /* Loads the key distribution file obtained from the sampler */
     private void loadPartitionFile() throws RuntimeException {
-        String keyDistFile = PigMapReduce.sJobConf.get("pig.keyDistFile", "");
+        String keyDistFile = PigMapReduce.sJobConfInternal.get().get("pig.keyDistFile", "");
         if (keyDistFile.isEmpty()) {
             throw new RuntimeException(
             "Internal error: missing key distribution file property.");
@@ -89,9 +89,9 @@ public class POPartitionRearrange extend
 
         boolean tmpFileCompression = Utils.tmpFileCompression(pigContext);
         if (tmpFileCompression) {
-            PigMapReduce.sJobConf.setBoolean("pig.tmpfilecompression", true);
+            PigMapReduce.sJobConfInternal.get().setBoolean("pig.tmpfilecompression", true);
             try {
-                PigMapReduce.sJobConf.set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext));
+                PigMapReduce.sJobConfInternal.get().set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext));
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Fri Feb 18 02:14:11 2011
@@ -261,8 +261,8 @@ public class POSort extends PhysicalOper
 			// by default, we create InternalSortedBag, unless user configures
 			// explicitly to use old bag
 			String bagType = null;
-	        if (PigMapReduce.sJobConf != null) {
-	   			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.sort.type");
+	        if (PigMapReduce.sJobConfInternal.get() != null) {
+	   			bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.sort.type");
 	   	    }
             if (bagType != null && bagType.equalsIgnoreCase("default")) {
             	sortedBag = BagFactory.getInstance().newSortedBag(mComparator);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Feb 18 02:14:11 2011
@@ -86,19 +86,19 @@ public class MapRedUtil {
         // use local file system to get the keyDistFile
         Configuration conf = new Configuration(false);            
         
-        if (PigMapReduce.sJobConf.get("fs.file.impl")!=null)
-            conf.set("fs.file.impl", PigMapReduce.sJobConf.get("fs.file.impl"));
-        if (PigMapReduce.sJobConf.get("fs.hdfs.impl")!=null)
-            conf.set("fs.hdfs.impl", PigMapReduce.sJobConf.get("fs.hdfs.impl"));
-        if (PigMapReduce.sJobConf.getBoolean("pig.tmpfilecompression", false))
+        if (PigMapReduce.sJobConfInternal.get().get("fs.file.impl")!=null)
+            conf.set("fs.file.impl", PigMapReduce.sJobConfInternal.get().get("fs.file.impl"));
+        if (PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl")!=null)
+            conf.set("fs.hdfs.impl", PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl"));
+        if (PigMapReduce.sJobConfInternal.get().getBoolean("pig.tmpfilecompression", false))
         {
             conf.setBoolean("pig.tmpfilecompression", true);
-            if (PigMapReduce.sJobConf.get("pig.tmpfilecompression.codec")!=null)
-                conf.set("pig.tmpfilecompression.codec", PigMapReduce.sJobConf.get("pig.tmpfilecompression.codec"));
+            if (PigMapReduce.sJobConfInternal.get().get("pig.tmpfilecompression.codec")!=null)
+                conf.set("pig.tmpfilecompression.codec", PigMapReduce.sJobConfInternal.get().get("pig.tmpfilecompression.codec"));
         }
         conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
 
-        ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(PigMapReduce.sJobConf), conf, 
+        ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(PigMapReduce.sJobConfInternal.get()), conf, 
                 keyDistFile, 0);
         DataBag partitionList;
         Tuple t = loader.getNext();

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Fri Feb 18 02:14:11 2011
@@ -86,7 +86,7 @@ public class HadoopExecutableManager ext
         }
         
         // Save a copy of the JobConf
-        job = PigMapReduce.sJobConf;
+        job = PigMapReduce.sJobConfInternal.get();
         
         // Save the output directory for the Pig Script
         scriptOutputDir = job.get("pig.streaming.task.output.dir");

Modified: pig/trunk/src/org/apache/pig/builtin/Distinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/Distinct.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/Distinct.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/Distinct.java Fri Feb 18 02:14:11 2011
@@ -117,8 +117,8 @@ public class Distinct  extends EvalFunc<
     	// by default, we create InternalSortedBag, unless user configures
 		// explicitly to use old bag
     	String bagType = null;
-        if (PigMapReduce.sJobConf != null) {     
-   			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.distinct.type");       			
+        if (PigMapReduce.sJobConfInternal.get() != null) {     
+   			bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");       			
    	    }
                       
     	if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	

Modified: pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Fri Feb 18 02:14:11 2011
@@ -57,8 +57,8 @@ public class InternalCachedBag extends D
     public InternalCachedBag(int bagCount) {       
         float percent = 0.2F;
         
-    	if (PigMapReduce.sJobConf != null) {
-    		String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+    	if (PigMapReduce.sJobConfInternal.get() != null) {
+    		String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
     		if (usage != null) {
     			percent = Float.parseFloat(usage);
     		}

Modified: pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Fri Feb 18 02:14:11 2011
@@ -82,8 +82,8 @@ public class InternalDistinctBag extends
     public InternalDistinctBag(int bagCount, double percent) {        
         if (percent < 0) {
         	percent = 0.2F;            
-        	if (PigMapReduce.sJobConf != null) {
-        		String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+        	if (PigMapReduce.sJobConfInternal.get() != null) {
+        		String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
         		if (usage != null) {
         			percent = Float.parseFloat(usage);
         		}

Modified: pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Fri Feb 18 02:14:11 2011
@@ -99,8 +99,8 @@ public class InternalSortedBag extends S
     public InternalSortedBag(int bagCount, double percent, Comparator<Tuple> comp) {
     	if (percent < 0) {
         	percent = 0.2F;            
-        	if (PigMapReduce.sJobConf != null) {
-        		String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+        	if (PigMapReduce.sJobConfInternal.get() != null) {
+        		String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
         		if (usage != null) {
         			percent = Float.parseFloat(usage);
         		}

Modified: pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Fri Feb 18 02:14:11 2011
@@ -194,7 +194,7 @@ public class DefaultIndexableLoader exte
     
     private void initRightLoader(int [] splitsToBeRead) throws IOException{
         PigContext pc = (PigContext) ObjectSerializer
-                .deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
+                .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.pigContext"));
         
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
         

Modified: pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Fri Feb 18 02:14:11 2011
@@ -156,7 +156,7 @@ public class FileLocalizer {
      * @throws IOException
      */
     public static InputStream openDFSFile(String fileName) throws IOException {
-        Configuration conf = PigMapReduce.sJobConf;
+        Configuration conf = PigMapReduce.sJobConfInternal.get();
         if (conf == null) {
             throw new RuntimeException(
                     "can't open DFS file while executing locally");
@@ -173,7 +173,7 @@ public class FileLocalizer {
     }
     
     public static long getSize(String fileName) throws IOException {
-    	Configuration conf = PigMapReduce.sJobConf;
+    	Configuration conf = PigMapReduce.sJobConfInternal.get();
 
     	if (conf == null) {
     		throw new RuntimeException(

Modified: pig/trunk/test/org/apache/pig/test/TestFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoin.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoin.java Fri Feb 18 02:14:11 2011
@@ -139,7 +139,7 @@ public class TestFRJoin extends TestCase
         private void setUpHashTable() throws IOException {
             FileSpec replFile = new FileSpec(repl,new FuncSpec(PigStorage.class.getName()+"()"));
             POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L), replFile);
-            PigContext pc = new PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
+            PigContext pc = new PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConfInternal.get()));
             try {
                 pc.connect();
             

Modified: pig/trunk/test/org/apache/pig/test/TestFinish.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFinish.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFinish.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFinish.java Fri Feb 18 02:14:11 2011
@@ -73,7 +73,7 @@ public class TestFinish {
         @Override
         public void finish() {
             try {
-                FileSystem fs = FileSystem.get(PigMapReduce.sJobConf);
+                FileSystem fs = FileSystem.get(PigMapReduce.sJobConfInternal.get());
                 fs.create(new Path(expectedFileName));
             } catch (IOException e) {
                 throw new RuntimeException("Unable to create file:" + expectedFileName);

Modified: pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Fri Feb 18 02:14:11 2011
@@ -1901,7 +1901,7 @@ public class TestPruneColumn extends Tes
         pigServer.registerQuery("store D into '" + Util.generateURI(output2.toString(), pigServer.getPigContext()) + "';");
         pigServer.executeBatch();
 
-        BufferedReader reader1 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output1.toString())));
+        BufferedReader reader1 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output1.toString(), pigServer.getPigContext().getProperties())));
         String line = reader1.readLine();
         assertTrue(line.equals("1\t2\t3"));
         
@@ -1910,7 +1910,7 @@ public class TestPruneColumn extends Tes
         
         assertTrue(reader1.readLine()==null);
         
-        BufferedReader reader2 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output2.toString())));
+        BufferedReader reader2 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output2.toString(), pigServer.getPigContext().getProperties())));
         line = reader2.readLine();
         assertTrue(line.equals("3"));
         

Modified: pig/trunk/test/org/apache/pig/test/utils/FILTERFROMFILE.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/utils/FILTERFROMFILE.java?rev=1071864&r1=1071863&r2=1071864&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/utils/FILTERFROMFILE.java (original)
+++ pig/trunk/test/org/apache/pig/test/utils/FILTERFROMFILE.java Fri Feb 18 02:14:11 2011
@@ -63,7 +63,7 @@ public class FILTERFROMFILE extends Filt
 	    
 		lookupTable = new HashMap<String, Boolean>();
 		
-		Properties props = ConfigurationUtil.toProperties(PigMapReduce.sJobConf);
+		Properties props = ConfigurationUtil.toProperties(PigMapReduce.sJobConfInternal.get());
 		InputStream is = FileLocalizer.openDFSFile(FilterFileName, props);
 
 		BufferedReader reader = new BufferedReader(new InputStreamReader(is));