You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/02/18 03:09:13 UTC
svn commit: r1071862 - in /pig/branches/branch-0.8: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/...
Author: daijy
Date: Fri Feb 18 02:09:11 2011
New Revision: 1071862
URL: http://svn.apache.org/viewvc?rev=1071862&view=rev
Log:
PIG-1831: Indeterministic behavior in local mode due to static variable PigMapReduce.sJobConf
Modified:
pig/branches/branch-0.8/CHANGES.txt
pig/branches/branch-0.8/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/LookupInFiles.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
pig/branches/branch-0.8/src/org/apache/pig/builtin/Distinct.java
pig/branches/branch-0.8/src/org/apache/pig/data/InternalCachedBag.java
pig/branches/branch-0.8/src/org/apache/pig/data/InternalDistinctBag.java
pig/branches/branch-0.8/src/org/apache/pig/data/InternalSortedBag.java
pig/branches/branch-0.8/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
pig/branches/branch-0.8/src/org/apache/pig/impl/io/FileLocalizer.java
pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin.java
pig/branches/branch-0.8/test/org/apache/pig/test/TestFinish.java
pig/branches/branch-0.8/test/org/apache/pig/test/TestPruneColumn.java
pig/branches/branch-0.8/test/org/apache/pig/test/utils/FILTERFROMFILE.java
Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Fri Feb 18 02:09:11 2011
@@ -213,6 +213,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1831: Indeterministic behavior in local mode due to static variable PigMapReduce.sJobConf (daijy)
+
PIG-1841: TupleSize implemented incorrectly (laukik via daijy)
PIG-1843: NPE in schema generation (daijy)
Modified: pig/branches/branch-0.8/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/LookupInFiles.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/LookupInFiles.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/LookupInFiles.java (original)
+++ pig/branches/branch-0.8/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/LookupInFiles.java Fri Feb 18 02:09:11 2011
@@ -77,7 +77,7 @@ public class LookupInFiles extends EvalF
}
else
{
- Properties props = ConfigurationUtil.toProperties(PigMapReduce.sJobConf);
+ Properties props = ConfigurationUtil.toProperties(PigMapReduce.sJobConfInternal.get());
for (int i = 0; i < mFiles.size(); ++i) {
// Files contain only 1 column with the key. No Schema. All keys
// separated by new line.
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Fri Feb 18 02:09:11 2011
@@ -92,7 +92,7 @@ public abstract class PigMapBase extends
return;
}
- if(PigMapReduce.sJobConf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
+ if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
// If there is a stream in the pipeline or if this map job belongs to merge-join we could
// potentially have more to process - so lets
// set the flag stating that all map input has been sent
@@ -141,6 +141,7 @@ public abstract class PigMapBase extends
Configuration job = context.getConfiguration();
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
PigMapReduce.sJobContext = context;
+ PigMapReduce.sJobConfInternal.set(context.getConfiguration());
PigMapReduce.sJobConf = context.getConfiguration();
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Fri Feb 18 02:09:11 2011
@@ -89,10 +89,10 @@ public class PigMapReduce {
* the job's {@link Configuration}:
* <pre>UdfContext.getUdfContext().getJobConf()</pre>
*/
- // This is used by internal pig code - it is deprecated for user code but is
- // used by Pig internal code to set up UDFContext's conf among other things.
@Deprecated
public static Configuration sJobConf = null;
+
+ public static final ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
private final static Tuple DUMMYTUPLE = null;
public static class Map extends PigMapBase {
@@ -291,6 +291,7 @@ public class PigMapReduce {
Configuration jConf = context.getConfiguration();
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
sJobContext = context;
+ sJobConfInternal.set(context.getConfiguration());
sJobConf = context.getConfiguration();
try {
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
@@ -480,7 +481,7 @@ public class PigMapReduce {
return;
}
- if(PigMapReduce.sJobConf.get("pig.stream.in.reduce", "false").equals("true")) {
+ if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) {
// If there is a stream in the pipeline we could
// potentially have more to process - so lets
// set the flag stating that all map input has been sent
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Fri Feb 18 02:09:11 2011
@@ -103,6 +103,7 @@ public class SkewedPartitioner extends P
@Override
public void setConf(Configuration job) {
conf = job;
+ PigMapReduce.sJobConfInternal.set(conf);
PigMapReduce.sJobConf = conf;
String keyDistFile = job.get("pig.keyDistFile", "");
if (keyDistFile.length() == 0)
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Fri Feb 18 02:09:11 2011
@@ -204,8 +204,8 @@ public class POCollectedGroup extends Ph
// the first time, just create a new buffer and continue.
if (prevKey == null && outputBag == null) {
- if (PigMapReduce.sJobConf != null) {
- String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
if (bagType != null && bagType.equalsIgnoreCase("default")) {
useDefaultBag = true;
}
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Fri Feb 18 02:09:11 2011
@@ -129,8 +129,8 @@ public class POCombinerPackage extends P
private DataBag createDataBag(int numBags) {
String bagType = null;
- if (PigMapReduce.sJobConf != null) {
- bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
}
if (bagType != null && bagType.equalsIgnoreCase("default")) {
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Fri Feb 18 02:09:11 2011
@@ -82,8 +82,8 @@ public class PODistinct extends Physical
// by default, we create InternalSortedBag, unless user configures
// explicitly to use old bag
String bagType = null;
- if (PigMapReduce.sJobConf != null) {
- bagType = PigMapReduce.sJobConf.get("pig.cachedbag.distinct.type");
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");
}
if (bagType != null && bagType.equalsIgnoreCase("default")) {
distinctBag = BagFactory.getInstance().newDistinctBag();
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java Fri Feb 18 02:09:11 2011
@@ -107,8 +107,8 @@ public class POJoinPackage extends POPac
if(firstTime){
firstTime = false;
- if (PigMapReduce.sJobConf != null) {
- String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
if (bagType != null && bagType.equalsIgnoreCase("default")) {
useDefaultBag = true;
}
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Fri Feb 18 02:09:11 2011
@@ -336,7 +336,7 @@ public class POMergeCogroup extends Phys
LoadFunc loadfunc = (LoadFunc)PigContext.instantiateFuncFromSpec(sidFuncSpecs.get(i));
loadfunc.setUDFContextSignature(loaderSignatures.get(i));
- Job dummyJob = new Job(new Configuration(PigMapReduce.sJobConf));
+ Job dummyJob = new Job(new Configuration(PigMapReduce.sJobConfInternal.get()));
loadfunc.setLocation(sideFileSpecs.get(i), dummyJob);
((IndexableLoadFunc)loadfunc).initialize(dummyJob.getConfiguration());
sideLoaders.add(loadfunc);
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Feb 18 02:09:11 2011
@@ -399,7 +399,7 @@ public class POMergeJoin extends Physica
// Pass signature of the loader to rightLoader
// make a copy of the conf to use in calls to rightLoader.
rightLoader.setUDFContextSignature(signature);
- Job job = new Job(new Configuration(PigMapReduce.sJobConf));
+ Job job = new Job(new Configuration(PigMapReduce.sJobConfInternal.get()));
rightLoader.setLocation(rightInputFileName, job);
((IndexableLoadFunc)rightLoader).initialize(job.getConfiguration());
((IndexableLoadFunc)rightLoader).seekNear(
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Fri Feb 18 02:09:11 2011
@@ -225,8 +225,8 @@ public class POPackage extends PhysicalO
if(firstTime){
firstTime = false;
- if (PigMapReduce.sJobConf != null) {
- String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
if (bagType != null && bagType.equalsIgnoreCase("default")) {
useDefaultBag = true;
}
@@ -471,8 +471,8 @@ public class POPackage extends PhysicalO
@SuppressWarnings("unchecked")
public POPackageTupleBuffer() {
batchSize = 20000;
- if (PigMapReduce.sJobConf != null) {
- String size = PigMapReduce.sJobConf.get("pig.accumulative.batchsize");
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String size = PigMapReduce.sJobConfInternal.get().get("pig.accumulative.batchsize");
if (size != null) {
batchSize = Integer.parseInt(size);
}
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Fri Feb 18 02:09:11 2011
@@ -81,7 +81,7 @@ public class POPartitionRearrange extend
/* Loads the key distribution file obtained from the sampler */
private void loadPartitionFile() throws RuntimeException {
- String keyDistFile = PigMapReduce.sJobConf.get("pig.keyDistFile", "");
+ String keyDistFile = PigMapReduce.sJobConfInternal.get().get("pig.keyDistFile", "");
if (keyDistFile.isEmpty()) {
throw new RuntimeException(
"Internal error: missing key distribution file property.");
@@ -89,9 +89,9 @@ public class POPartitionRearrange extend
boolean tmpFileCompression = Utils.tmpFileCompression(pigContext);
if (tmpFileCompression) {
- PigMapReduce.sJobConf.setBoolean("pig.tmpfilecompression", true);
+ PigMapReduce.sJobConfInternal.get().setBoolean("pig.tmpfilecompression", true);
try {
- PigMapReduce.sJobConf.set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext));
+ PigMapReduce.sJobConfInternal.get().set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext));
} catch (Exception e) {
throw new RuntimeException(e);
}
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Fri Feb 18 02:09:11 2011
@@ -269,8 +269,8 @@ public class POSort extends PhysicalOper
// by default, we create InternalSortedBag, unless user configures
// explicitly to use old bag
String bagType = null;
- if (PigMapReduce.sJobConf != null) {
- bagType = PigMapReduce.sJobConf.get("pig.cachedbag.sort.type");
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.sort.type");
}
if (bagType != null && bagType.equalsIgnoreCase("default")) {
sortedBag = BagFactory.getInstance().newSortedBag(mComparator);
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Feb 18 02:09:11 2011
@@ -85,19 +85,19 @@ public class MapRedUtil {
// use local file system to get the keyDistFile
Configuration conf = new Configuration(false);
- if (PigMapReduce.sJobConf.get("fs.file.impl")!=null)
- conf.set("fs.file.impl", PigMapReduce.sJobConf.get("fs.file.impl"));
- if (PigMapReduce.sJobConf.get("fs.hdfs.impl")!=null)
- conf.set("fs.hdfs.impl", PigMapReduce.sJobConf.get("fs.hdfs.impl"));
- if (PigMapReduce.sJobConf.getBoolean("pig.tmpfilecompression", false))
+ if (PigMapReduce.sJobConfInternal.get().get("fs.file.impl")!=null)
+ conf.set("fs.file.impl", PigMapReduce.sJobConfInternal.get().get("fs.file.impl"));
+ if (PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl")!=null)
+ conf.set("fs.hdfs.impl", PigMapReduce.sJobConfInternal.get().get("fs.hdfs.impl"));
+ if (PigMapReduce.sJobConfInternal.get().getBoolean("pig.tmpfilecompression", false))
{
conf.setBoolean("pig.tmpfilecompression", true);
- if (PigMapReduce.sJobConf.get("pig.tmpfilecompression.codec")!=null)
- conf.set("pig.tmpfilecompression.codec", PigMapReduce.sJobConf.get("pig.tmpfilecompression.codec"));
+ if (PigMapReduce.sJobConfInternal.get().get("pig.tmpfilecompression.codec")!=null)
+ conf.set("pig.tmpfilecompression.codec", PigMapReduce.sJobConfInternal.get().get("pig.tmpfilecompression.codec"));
}
conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
- ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(PigMapReduce.sJobConf), conf,
+ ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(PigMapReduce.sJobConfInternal.get()), conf,
keyDistFile, 0);
DataBag partitionList;
Tuple t = loader.getNext();
Modified: pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Fri Feb 18 02:09:11 2011
@@ -86,7 +86,7 @@ public class HadoopExecutableManager ext
}
// Save a copy of the JobConf
- job = PigMapReduce.sJobConf;
+ job = PigMapReduce.sJobConfInternal.get();
// Save the output directory for the Pig Script
scriptOutputDir = job.get("pig.streaming.task.output.dir");
Modified: pig/branches/branch-0.8/src/org/apache/pig/builtin/Distinct.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/builtin/Distinct.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/builtin/Distinct.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/builtin/Distinct.java Fri Feb 18 02:09:11 2011
@@ -117,8 +117,8 @@ public class Distinct extends EvalFunc<
// by default, we create InternalSortedBag, unless user configures
// explicitly to use old bag
String bagType = null;
- if (PigMapReduce.sJobConf != null) {
- bagType = PigMapReduce.sJobConf.get("pig.cachedbag.distinct.type");
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");
}
if (bagType != null && bagType.equalsIgnoreCase("default")) {
Modified: pig/branches/branch-0.8/src/org/apache/pig/data/InternalCachedBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/data/InternalCachedBag.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/data/InternalCachedBag.java Fri Feb 18 02:09:11 2011
@@ -57,8 +57,8 @@ public class InternalCachedBag extends D
public InternalCachedBag(int bagCount) {
float percent = 0.2F;
- if (PigMapReduce.sJobConf != null) {
- String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
if (usage != null) {
percent = Float.parseFloat(usage);
}
Modified: pig/branches/branch-0.8/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/data/InternalDistinctBag.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/data/InternalDistinctBag.java Fri Feb 18 02:09:11 2011
@@ -82,8 +82,8 @@ public class InternalDistinctBag extends
public InternalDistinctBag(int bagCount, double percent) {
if (percent < 0) {
percent = 0.2F;
- if (PigMapReduce.sJobConf != null) {
- String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
if (usage != null) {
percent = Float.parseFloat(usage);
}
Modified: pig/branches/branch-0.8/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/data/InternalSortedBag.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/data/InternalSortedBag.java Fri Feb 18 02:09:11 2011
@@ -99,8 +99,8 @@ public class InternalSortedBag extends S
public InternalSortedBag(int bagCount, double percent, Comparator<Tuple> comp) {
if (percent < 0) {
percent = 0.2F;
- if (PigMapReduce.sJobConf != null) {
- String usage = PigMapReduce.sJobConf.get("pig.cachedbag.memusage");
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
if (usage != null) {
percent = Float.parseFloat(usage);
}
Modified: pig/branches/branch-0.8/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Fri Feb 18 02:09:11 2011
@@ -194,7 +194,7 @@ public class DefaultIndexableLoader exte
private void initRightLoader(int [] splitsToBeRead) throws IOException{
PigContext pc = (PigContext) ObjectSerializer
- .deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
+ .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.pigContext"));
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
Modified: pig/branches/branch-0.8/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/io/FileLocalizer.java Fri Feb 18 02:09:11 2011
@@ -156,7 +156,7 @@ public class FileLocalizer {
* @throws IOException
*/
public static InputStream openDFSFile(String fileName) throws IOException {
- Configuration conf = PigMapReduce.sJobConf;
+ Configuration conf = PigMapReduce.sJobConfInternal.get();
if (conf == null) {
throw new RuntimeException(
"can't open DFS file while executing locally");
@@ -173,7 +173,7 @@ public class FileLocalizer {
}
public static long getSize(String fileName) throws IOException {
- Configuration conf = PigMapReduce.sJobConf;
+ Configuration conf = PigMapReduce.sJobConfInternal.get();
if (conf == null) {
throw new RuntimeException(
Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestFRJoin.java Fri Feb 18 02:09:11 2011
@@ -139,7 +139,7 @@ public class TestFRJoin extends TestCase
private void setUpHashTable() throws IOException {
FileSpec replFile = new FileSpec(repl,new FuncSpec(PigStorage.class.getName()+"()"));
POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L), replFile);
- PigContext pc = new PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
+ PigContext pc = new PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConfInternal.get()));
try {
pc.connect();
Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestFinish.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestFinish.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestFinish.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestFinish.java Fri Feb 18 02:09:11 2011
@@ -73,7 +73,7 @@ public class TestFinish {
@Override
public void finish() {
try {
- FileSystem fs = FileSystem.get(PigMapReduce.sJobConf);
+ FileSystem fs = FileSystem.get(PigMapReduce.sJobConfInternal.get());
fs.create(new Path(expectedFileName));
} catch (IOException e) {
throw new RuntimeException("Unable to create file:" + expectedFileName);
Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestPruneColumn.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestPruneColumn.java Fri Feb 18 02:09:11 2011
@@ -1925,7 +1925,7 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("store D into '" + Util.generateURI(output2.toString(), pigServer.getPigContext()) + "';");
pigServer.executeBatch();
- BufferedReader reader1 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output1.toString())));
+ BufferedReader reader1 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output1.toString(), pigServer.getPigContext().getProperties())));
String line = reader1.readLine();
assertTrue(line.equals("1\t2\t3"));
@@ -1934,7 +1934,7 @@ public class TestPruneColumn extends Tes
assertTrue(reader1.readLine()==null);
- BufferedReader reader2 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output2.toString())));
+ BufferedReader reader2 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output2.toString(), pigServer.getPigContext().getProperties())));
line = reader2.readLine();
assertTrue(line.equals("3"));
Modified: pig/branches/branch-0.8/test/org/apache/pig/test/utils/FILTERFROMFILE.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/utils/FILTERFROMFILE.java?rev=1071862&r1=1071861&r2=1071862&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/utils/FILTERFROMFILE.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/utils/FILTERFROMFILE.java Fri Feb 18 02:09:11 2011
@@ -63,7 +63,7 @@ public class FILTERFROMFILE extends Filt
lookupTable = new HashMap<String, Boolean>();
- Properties props = ConfigurationUtil.toProperties(PigMapReduce.sJobConf);
+ Properties props = ConfigurationUtil.toProperties(PigMapReduce.sJobConfInternal.get());
InputStream is = FileLocalizer.openDFSFile(FilterFileName, props);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));