You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/10/05 23:18:25 UTC

svn commit: r1394816 [1/2] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/par...

Author: jcoveney
Date: Fri Oct  5 21:18:23 2012
New Revision: 1394816

URL: http://svn.apache.org/viewvc?rev=1394816&view=rev
Log:
PIG-2877: Make SchemaTuple work in foreach (and thus, in loads) (jcoveney)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    pig/trunk/src/org/apache/pig/data/BinInterSedes.java
    pig/trunk/src/org/apache/pig/data/SchemaTuple.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java
    pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
    pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
    pig/trunk/test/org/apache/pig/test/TestBuiltin.java
    pig/trunk/test/org/apache/pig/test/TestCommit.java
    pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java
    pig/trunk/test/org/apache/pig/test/TestLoad.java
    pig/trunk/test/org/apache/pig/test/TestPigServer.java
    pig/trunk/test/org/apache/pig/test/TestPigServerWithMacros.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Oct  5 21:18:23 2012
@@ -25,6 +25,8 @@ PIG-1891 Enable StoreFunc to make intell
 
 IMPROVEMENTS
 
+PIG-2877: Make SchemaTuple work in foreach (and thus, in loads) (jcoveney)
+
 PIG-2923: Lazily register bags with SpillableMemoryManager (dvryaboy)
 
 PIG-2929: Improve documentation around AVG, CONCAT, MIN, MAX (cheolsoo via billgraham)

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Fri Oct  5 21:18:23 2012
@@ -19,9 +19,15 @@
 package org.apache.pig;
 
 /**
- * Container for static configuration strings, defaults, etc.
+ * Container for static configuration strings, defaults, etc. This is intended just for keys that can
+ * be set by users, not for keys that are generally used within pig.
  */
 public class PigConfiguration {
+    private PigConfiguration() {}
+
+    /////////////////////////////////////////////////////////////////////////////////////
+    /////////////////////////       COMMAND LINE KEYS       /////////////////////////////
+    /////////////////////////////////////////////////////////////////////////////////////
 
     /**
      * Controls the fraction of total memory that is allowed to be used by
@@ -48,6 +54,23 @@ public class PigConfiguration {
     public static final String TIME_UDFS_PROP = "pig.udf.profile";
 
     /**
+     * This key must be set to true by the user for code generation to be used.
+     * In the future, it may be turned on by default (at least in certain cases),
+     * but for now it is too experimental.
+     */
+    public static final String SHOULD_USE_SCHEMA_TUPLE = "pig.schematuple";
+
+    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_UDF = "pig.schematuple.udf";
+
+    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_FOREACH = "pig.schematuple.foreach";
+
+    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_FRJOIN = "pig.schematuple.fr_join";
+
+    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_MERGEJOIN = "pig.schematuple.merge_join";
+
+    public static final String SCHEMA_TUPLE_SHOULD_ALLOW_FORCE = "pig.schematuple.force";
+
+    /*
      * Turns off use of combiners in MapReduce jobs produced by Pig.
      */
     public static final String PROP_NO_COMBINER = "pig.exec.nocombiner";

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Oct  5 21:18:23 2012
@@ -280,7 +280,7 @@ public class HExecutionEngine {
         SortInfoSetter sortInfoSetter = new SortInfoSetter( plan );
         sortInfoSetter.visit();
         
-        if (pigContext.inExplain==false) {
+        if (!pigContext.inExplain) {
             // Validate input/output file. Currently no validation framework in
             // new logical plan, put this validator here first.
             // We might decide to move it out to a validator framework in future

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Fri Oct  5 21:18:23 2012
@@ -91,7 +91,7 @@ public class HJob implements ExecJob {
              
              p = (LoadFunc) new ReadToEndLoader(originalLoadFunc, 
                      ConfigurationUtil.toConfiguration(
-                     pigContext.getProperties()), outFileSpec.getFileName(), 0);
+                     pigContext.getProperties()), outFileSpec.getFileName(), 0, pigContext);
 
         }catch (Exception e){
             int errCode = 2088;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Oct  5 21:18:23 2012
@@ -25,7 +25,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -35,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -70,13 +68,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
@@ -90,11 +86,11 @@ import org.apache.pig.impl.io.FileLocali
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.io.NullableBooleanWritable;
 import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.io.NullableDateTimeWritable;
 import org.apache.pig.impl.io.NullableDoubleWritable;
 import org.apache.pig.impl.io.NullableFloatWritable;
 import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.io.NullableLongWritable;
-import org.apache.pig.impl.io.NullableDateTimeWritable;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableText;
 import org.apache.pig.impl.io.NullableTuple;
@@ -446,7 +442,6 @@ public class JobControlCompiler{
             ss.addSettingsToConf(mro, conf);
         }
 
-
         conf.set("mapred.mapper.new-api", "true");
         conf.set("mapred.reducer.new-api", "true");
 
@@ -655,7 +650,7 @@ public class JobControlCompiler{
             setupDistributedCacheForJoin(mro, pigContext, conf);
 
             // Search to see if we have any UDFs that need to pack things into the
-            // distrubted cache.
+            // distributed cache.
             setupDistributedCacheForUdfs(mro, pigContext, conf);
 
             SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Fri Oct  5 21:18:23 2012
@@ -172,7 +172,7 @@ public abstract class PigGenericMapBase 
         pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
 
         // This attempts to fetch all of the generated code from the distributed cache, and resolve it
-        SchemaTupleBackend.initialize(job, pigContext.getExecType());
+        SchemaTupleBackend.initialize(job, pigContext);
 
         if (pigContext.getLog4jProperties()!=null)
             PropertyConfigurator.configure(pigContext.getLog4jProperties());

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Fri Oct  5 21:18:23 2012
@@ -316,7 +316,7 @@ public class PigGenericMapReduce {
                 pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
                 
                 // This attempts to fetch all of the generated code from the distributed cache, and resolve it
-                SchemaTupleBackend.initialize(jConf, pigContext.getExecType());
+                SchemaTupleBackend.initialize(jConf, pigContext);
 
                 if (rp == null)
                     rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri Oct  5 21:18:23 2012
@@ -43,11 +43,11 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.NullableBooleanWritable;
 import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.io.NullableDateTimeWritable;
 import org.apache.pig.impl.io.NullableDoubleWritable;
 import org.apache.pig.impl.io.NullableFloatWritable;
 import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.io.NullableLongWritable;
-import org.apache.pig.impl.io.NullableDateTimeWritable;
 import org.apache.pig.impl.io.NullableText;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -107,8 +107,6 @@ public class WeightedRangePartitioner ex
         }
         
         try{
-            
-            
             // use local file system to get the quantilesFile
             Configuration conf;
             if (pigContext.getExecType()==ExecType.MAPREDUCE) {
@@ -116,24 +114,25 @@ public class WeightedRangePartitioner ex
             } else {
                 conf = new Configuration(false);
             }
-            if (configuration.get("fs.file.impl")!=null)
+            if (configuration.get("fs.file.impl") != null) {
                 conf.set("fs.file.impl", configuration.get("fs.file.impl"));
-            if (configuration.get("fs.hdfs.impl")!=null)
+            }
+            if (configuration.get("fs.hdfs.impl") != null) {
                 conf.set("fs.hdfs.impl", configuration.get("fs.hdfs.impl"));
-            if (configuration.getBoolean("pig.tmpfilecompression", false))
-            {
+            }
+            if (configuration.getBoolean("pig.tmpfilecompression", false)) {
                 conf.setBoolean("pig.tmpfilecompression", true);
-                if (configuration.get("pig.tmpfilecompression.codec")!=null)
+                if (configuration.get("pig.tmpfilecompression.codec") != null) {
                     conf.set("pig.tmpfilecompression.codec", configuration.get("pig.tmpfilecompression.codec"));
             }
+            }
             conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
             
             ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(conf),
                     conf, quantilesFile, 0);
             DataBag quantilesList;
             Tuple t = loader.getNext();
-            if(t!=null)
-            {
+            if (t != null) {
                 // the Quantiles file has a tuple as under:
                 // (numQuantiles, bag of samples) 
                 // numQuantiles here is the reduce parallelism

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Oct  5 21:18:23 2012
@@ -292,7 +292,6 @@ public abstract class PhysicalOperator e
      * @throws ExecException
      */
     public Result processInput() throws ExecException {
-
         Result res = new Result();
         if (input == null && (inputs == null || inputs.size()==0)) {
 //            log.warn("No inputs found. Signaling End of Processing.");

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Oct  5 21:18:23 2012
@@ -36,8 +36,12 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.SchemaTupleFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.TupleMaker;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -48,15 +52,10 @@ import org.apache.pig.pen.util.LineageTr
 //We intentionally skip type checking in backend for performance reasons
 @SuppressWarnings("unchecked")
 public class POForEach extends PhysicalOperator {
-
-    /**
-     *
-     */
     private static final long serialVersionUID = 1L;
 
     protected List<PhysicalPlan> inputPlans;
     protected List<PhysicalOperator> opsToBeReset;
-    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
     //Since the plan has a generate, this needs to be maintained
     //as the generate can potentially return multiple tuples for
     //same call.
@@ -93,14 +92,12 @@ public class POForEach extends PhysicalO
 
     protected Tuple inpTuple;
 
+    private Schema schema;
+
     public POForEach(OperatorKey k) {
         this(k,-1,null,null);
     }
 
-    public POForEach(OperatorKey k, int rp, List inp) {
-        this(k,rp,inp,null);
-    }
-
     public POForEach(OperatorKey k, int rp) {
         this(k,rp,null,null);
     }
@@ -117,6 +114,12 @@ public class POForEach extends PhysicalO
         getLeaves();
     }
 
+    public POForEach(OperatorKey operatorKey, int requestedParallelism,
+            List<PhysicalPlan> innerPlans, List<Boolean> flattenList, Schema schema) {
+        this(operatorKey, requestedParallelism, innerPlans, flattenList);
+        this.schema = schema;
+    }
+
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitPOForEach(this);
@@ -301,6 +304,8 @@ public class POForEach extends PhysicalO
     }
 
     private boolean isEarlyTerminated = false;
+    private TupleMaker<? extends Tuple> tupleMaker;
+    private boolean knownSize = false;
 
     private boolean isEarlyTerminated() {
         return isEarlyTerminated;
@@ -311,6 +316,19 @@ public class POForEach extends PhysicalO
     }
 
     protected Result processPlan() throws ExecException{
+        if (schema != null && tupleMaker == null) {
+            // Note here that if SchemaTuple is currently turned on, then any UDF's in the chain
+            // must follow good practices. Namely, they should not append to the Tuple that comes
+            // out of an iterator (a practice which is fairly common, but is not recommended).
+            tupleMaker = SchemaTupleFactory.getInstance(schema, false, GenContext.FOREACH);
+            if (tupleMaker != null) {
+                knownSize = true;
+            }
+        }
+        if (tupleMaker == null) {
+            tupleMaker = TupleFactory.getInstance();
+        }
+
         Result res = new Result();
 
         //We check if all the databags have exhausted the tuples. If so we enforce the reading of new data by setting data and its to null
@@ -471,7 +489,9 @@ public class POForEach extends PhysicalO
      * @return the final flattened tuple
      */
     protected Tuple createTuple(Object[] data) throws ExecException {
-        Tuple out =  mTupleFactory.newTuple();
+        Tuple out =  tupleMaker.newTuple();
+
+        int idx = 0;
         for(int i = 0; i < data.length; ++i) {
             Object in = data[i];
 
@@ -479,12 +499,20 @@ public class POForEach extends PhysicalO
                 Tuple t = (Tuple)in;
                 int size = t.size();
                 for(int j = 0; j < size; ++j) {
+                    if (knownSize) {
+                        out.set(idx++, t.get(j));
+                    } else {
                     out.append(t.get(j));
                 }
+                }
+            } else {
+                if (knownSize) {
+                    out.set(idx++, in);
             } else {
                 out.append(in);
             }
         }
+        }
         if (inpTuple != null) {
             return illustratorMarkup(inpTuple, out, 0);
         } else {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Fri Oct  5 21:18:23 2012
@@ -21,32 +21,28 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
-import org.apache.pig.SortInfo;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalSortedBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.pen.util.LineageTracer;
 
 /**
  * This implementation is applicable for both the physical plan and for the
@@ -304,19 +300,16 @@ public class POSort extends PhysicalOper
 
 	@Override
 	public boolean supportsMultipleInputs() {
-
 		return false;
 	}
 
 	@Override
 	public boolean supportsMultipleOutputs() {
-
 		return false;
 	}
 
 	@Override
 	public void visit(PhyPlanVisitor v) throws VisitorException {
-
 		v.visitSort(this);
 	}
 

Modified: pig/trunk/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/BinInterSedes.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/BinInterSedes.java (original)
+++ pig/trunk/src/org/apache/pig/data/BinInterSedes.java Fri Oct  5 21:18:23 2012
@@ -28,9 +28,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -45,6 +42,8 @@ import org.apache.pig.classification.Int
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.utils.SedesHelper;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 
 /**
  * A class to handle reading and writing of intermediate results of data types. The serialization format used by this
@@ -168,7 +167,6 @@ public class BinInterSedes implements In
     }
 
     public int getTupleSize(DataInput in, byte type) throws IOException {
-
         int sz;
         switch (type) {
         case TUPLE_0:
@@ -542,8 +540,7 @@ public class BinInterSedes implements In
         }
 
         case DataType.CHARARRAY: {
-            String s = (String) val;
-            SedesHelper.writeChararray(out, s);
+            SedesHelper.writeChararray(out, (String) val);
             break;
         }
         case DataType.GENERIC_WRITABLECOMPARABLE:

Modified: pig/trunk/src/org/apache/pig/data/SchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTuple.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTuple.java Fri Oct  5 21:18:23 2012
@@ -34,9 +34,8 @@ import org.apache.pig.classification.Int
 import org.apache.pig.data.utils.MethodHelper;
 import org.apache.pig.data.utils.MethodHelper.NotImplemented;
 import org.apache.pig.data.utils.SedesHelper;
-import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.Utils;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.mortbay.log.Log;
 
 import com.google.common.collect.Lists;
@@ -195,12 +194,12 @@ public abstract class SchemaTuple<T exte
     }
 
     protected static DataBag read(DataInput in, DataBag v) throws IOException {
-        return (DataBag) bis.readDatum(in, DataType.BAG);
+        return (DataBag) bis.readDatum(in);
     }
 
     @SuppressWarnings("unchecked")
     protected static Map<String, Object> read(DataInput in, Map<String, Object> v) throws IOException {
-        return (Map<String, Object>) bis.readDatum(in, DataType.MAP);
+        return (Map<String, Object>) bis.readDatum(in);
     }
 
     protected static int read(DataInput in, int v) throws IOException {
@@ -355,11 +354,11 @@ public abstract class SchemaTuple<T exte
     }
 
     protected DataBag unbox(Object v, DataBag t) {
-        return unbox((DataBag) t);
+        return unbox((DataBag) v);
     }
 
     protected Map<String, Object> unbox(Object v, Map<String, Object> t) {
-        return unbox((Map<String, Object>) t);
+        return unbox((Map<String, Object>) v);
     }
 
     protected byte[] unbox(Object v, byte[] t) {
@@ -839,10 +838,9 @@ public abstract class SchemaTuple<T exte
                 Log.warn("No Schema present in SchemaTuple generated class");
                 return new Schema();
             }
-            s = new String(Base64.decodeBase64(s));
-            return Utils.getSchemaFromString(s);
-        } catch (FrontendException e) {
-            throw new RuntimeException("Unable to make Schema for String: " + s);
+            return (Schema) ObjectSerializer.deserialize(s);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to deserialize serialized Schema: " + s, e);
         }
     }
 

Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java Fri Oct  5 21:18:23 2012
@@ -17,6 +17,9 @@
  */
 package org.apache.pig.data;
 
+import static org.apache.pig.PigConfiguration.SHOULD_USE_SCHEMA_TUPLE;
+import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -31,9 +34,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConstants;
 import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
 import org.apache.pig.data.utils.StructuresHelper.SchemaKey;
 import org.apache.pig.data.utils.StructuresHelper.Triple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 import com.google.common.collect.Maps;
@@ -61,19 +66,6 @@ public class SchemaTupleBackend {
     private boolean abort = false;
 
     /**
-     * This key must be set to true by the user for code generation to be used.
-     * In the future, it may be turned on by default (at least in certain cases),
-     * but for now it is too experimental.
-     */
-    public static final String SHOULD_GENERATE_KEY = "pig.schematuple";
-
-    /**
-     * This key is used in the job conf to let the various jobs know what code was
-     * generated.
-     */
-    public static final String GENERATED_CLASSES_KEY = "pig.schematuple.classes";
-
-    /**
      * The only information this class needs is a directory of generated code to resolve
      * classes in.
      * @param jConf
@@ -81,13 +73,13 @@ public class SchemaTupleBackend {
      */
     private SchemaTupleBackend(Configuration jConf, boolean isLocal) {
         if (isLocal) {
-            String localCodeDir = jConf.get(SchemaTupleFrontend.LOCAL_CODE_DIR);
+            String localCodeDir = jConf.get(PigConstants.LOCAL_CODE_DIR);
             if (localCodeDir == null) {
                 LOG.debug("No local code dir set in local mode. Aborting code gen resolution.");
                 abort = true;
                 return;
             }
-            codeDir = new File(jConf.get(SchemaTupleFrontend.LOCAL_CODE_DIR));
+            codeDir = new File(jConf.get(PigConstants.LOCAL_CODE_DIR));
         } else {
             codeDir = Files.createTempDir();
             codeDir.deleteOnExit();
@@ -126,7 +118,7 @@ public class SchemaTupleBackend {
     private SchemaTupleFactory internalNewSchemaTupleFactory(int id) {
         SchemaTupleFactory stf = schemaTupleFactoriesById.get(id);
         if (stf == null) {
-            LOG.warn("No SchemaTupleFactory present for given identifier: " + id);
+            LOG.debug("No SchemaTupleFactory present for given identifier: " + id);
         }
         return stf;
     }
@@ -141,7 +133,7 @@ public class SchemaTupleBackend {
     private SchemaTupleFactory newSchemaTupleFactory(Triple<SchemaKey, Boolean, GenContext> trip) {
         SchemaTupleFactory stf = schemaTupleFactoriesByTriple.get(trip);
         if (stf == null) {
-            SchemaTupleFactory.LOG.warn("No SchemaTupleFactory present for given SchemaKey/Boolean/Context combination " + trip);
+            LOG.debug("No SchemaTupleFactory present for given SchemaKey/Boolean/Context combination " + trip);
         }
         return stf;
     }
@@ -159,9 +151,8 @@ public class SchemaTupleBackend {
             return;
         }
         // Step one is to see if there are any classes in the distributed cache
-        String shouldGenerate = jConf.get(SchemaTupleBackend.SHOULD_GENERATE_KEY);
-        if (shouldGenerate == null || !Boolean.parseBoolean(shouldGenerate)) {
-            LOG.info("Key [" + SchemaTupleBackend.SHOULD_GENERATE_KEY +"] was not set... aborting generation");
+        if (!jConf.getBoolean(SHOULD_USE_SCHEMA_TUPLE, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
+            LOG.info("Key [" + SHOULD_USE_SCHEMA_TUPLE +"] was not set... will not generate code.");
             return;
         }
         // Step two is to copy everything from the distributed cache if we are in distributed mode
@@ -183,12 +174,12 @@ public class SchemaTupleBackend {
     }
 
     private void copyAllFromDistributedCache() throws IOException {
-        String toDeserialize = jConf.get(GENERATED_CLASSES_KEY);
+        String toDeserialize = jConf.get(PigConstants.GENERATED_CLASSES_KEY);
         if (toDeserialize == null) {
-            LOG.info("No classes in in key [" + GENERATED_CLASSES_KEY + "] to copy from distributed cache.");
+            LOG.info("No classes in in key [" + PigConstants.GENERATED_CLASSES_KEY + "] to copy from distributed cache.");
             return;
         }
-        LOG.info("Copying files in key ["+GENERATED_CLASSES_KEY+"] from distributed cache: " + toDeserialize);
+        LOG.info("Copying files in key ["+PigConstants.GENERATED_CLASSES_KEY+"] from distributed cache: " + toDeserialize);
         for (String s : toDeserialize.split(",")) {
             LOG.info("Attempting to read file: " + s);
             // The string is the symlink into the distributed cache
@@ -273,22 +264,36 @@ public class SchemaTupleBackend {
 
     private static SchemaTupleBackend stb;
 
-    public static void initialize(Configuration jConf, ExecType type) throws IOException {
-        stb = new SchemaTupleBackend(jConf, type == ExecType.LOCAL);
+    public static void initialize(Configuration jConf, PigContext pigContext) throws IOException {
+        initialize(jConf, pigContext, pigContext.getExecType() == ExecType.LOCAL);
+    }
+
+    public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException {
+        if (stb != null) {
+            LOG.warn("SchemaTupleBackend has already been initialized");
+        } else {
+            SchemaTupleFrontend.lazyReset(pigContext);
+            SchemaTupleFrontend.reset();
+            stb = new SchemaTupleBackend(jConf, isLocal);
         stb.copyAndResolve();
     }
+    }
 
     public static SchemaTupleFactory newSchemaTupleFactory(Schema s, boolean isAppendable, GenContext context)  {
         if (stb == null) {
             // It is possible (though ideally should be avoided) for this to be called on the frontend if
             // the Tuple processing path of the POPlan is invoked (perhaps for optimization purposes)
-            LOG.warn("initialize was not called! Even when SchemaTuple feature is not set, it should be called.");
-            return null;
+            throw new RuntimeException("initialize was not called! Even when SchemaTuple feature is not set, it should be called.");
         }
         return stb.internalNewSchemaTupleFactory(s, isAppendable, context);
     }
 
     protected static SchemaTupleFactory newSchemaTupleFactory(int id) {
+        if (stb == null) {
+            // It is possible (though ideally should be avoided) for this to be called on the frontend if
+            // the Tuple processing path of the POPlan is invoked (perhaps for optimization purposes)
+            throw new RuntimeException("initialize was not called! Even when SchemaTuple feature is not set, it should be called.");
+        }
         return stb.internalNewSchemaTupleFactory(id);
     }
 }

Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java Fri Oct  5 21:18:23 2012
@@ -18,6 +18,7 @@
 package org.apache.pig.data;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
@@ -25,14 +26,15 @@ import java.lang.annotation.Target;
 import java.util.List;
 import java.util.Queue;
 
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.JavaCompilerHelper;
+import org.apache.pig.impl.util.ObjectSerializer;
 
 import com.google.common.collect.Lists;
 
@@ -59,28 +61,27 @@ public class SchemaTupleClassGenerator {
          * This context is used in UDF code. Currently, this is only used for
          * the inputs to UDF's.
          */
-        UDF ("pig.schematuple.udf", true, GenerateUdf.class),
+        UDF (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_UDF, true, GenerateUdf.class),
         /**
-         * This context is for LoadFuncs. It is currently not used,
-         * however the intent is that when a Schema is known, the
-         * LoadFunc can return typed Tuples.
+         * This context is for POForEach. This will use the expected output of a ForEach
+         * to return a typed Tuple.
          */
-        LOAD ("pig.schematuple.load", true, GenerateLoad.class),
+        FOREACH (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_FOREACH, true, GenerateForeach.class),
         /**
          * This context controls whether or not SchemaTuples will be used in FR joins.
          * Currently, they will be used in the HashMap that FR Joins construct.
          */
-        FR_JOIN ("pig.schematuple.fr_join", true, GenerateFrJoin.class),
+        FR_JOIN (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_FRJOIN, true, GenerateFrJoin.class),
         /**
          * This context controls whether or not SchemaTuples will be used in merge joins.
          */
-        MERGE_JOIN ("pig.schematuple.merge_join", true, GenerateMergeJoin.class),
+        MERGE_JOIN (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_MERGEJOIN, true, GenerateMergeJoin.class),
         /**
          * All registered Schemas will also be registered in one additional context.
          * This context will allow users to "force" the load of a SchemaTupleFactory
          * if one is present in any context.
          */
-        FORCE_LOAD ("pig.schematuple.force", true, GenerateForceLoad.class);
+        FORCE_LOAD (PigConfiguration.SCHEMA_TUPLE_SHOULD_ALLOW_FORCE, true, GenerateForceLoad.class);
 
         /**
          * These annotations are used to mark a given SchemaTuple with
@@ -93,7 +94,7 @@ public class SchemaTupleClassGenerator {
 
         @Retention(RetentionPolicy.RUNTIME)
         @Target(ElementType.TYPE)
-        public @interface GenerateLoad {}
+        public @interface GenerateForeach {}
 
         @Retention(RetentionPolicy.RUNTIME)
         @Target(ElementType.TYPE)
@@ -158,6 +159,10 @@ public class SchemaTupleClassGenerator {
      */
     private static int nextGlobalClassIdentifier = 0;
 
+    protected static void resetGlobalClassIdentifier() {
+        nextGlobalClassIdentifier = 0;
+    }
+
     /**
      * This class actually generates the code for a given Schema.
      * @param   schema
@@ -319,9 +324,12 @@ public class SchemaTupleClassGenerator {
         private File codeDir;
 
         public void prepare() {
-            String s = schema.toString();
-            s = s.substring(1, s.length() - 1);
-            s = Base64.encodeBase64URLSafeString(s.getBytes());
+            String s;
+            try {
+                s = ObjectSerializer.serialize(schema);
+            } catch (IOException e) {
+                throw new RuntimeException("Unable to serialize schema: " + schema, e);
+            }
             add("private static Schema schema = staticSchemaGen(\"" + s + "\");");
         }
 

Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java Fri Oct  5 21:18:23 2012
@@ -52,7 +52,7 @@ public class SchemaTupleFactory implemen
      * @return  true if it is generatable
      */
     public static boolean isGeneratable(Schema s) {
-        if (s == null) {
+        if (s == null || s.size() == 0) {
             return false;
         }
 

Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java Fri Oct  5 21:18:23 2012
@@ -17,11 +17,17 @@
  */
 package org.apache.pig.data;
 
+import static org.apache.pig.PigConfiguration.SHOULD_USE_SCHEMA_TUPLE;
+import static org.apache.pig.PigConstants.GENERATED_CLASSES_KEY;
+import static org.apache.pig.PigConstants.LOCAL_CODE_DIR;
+import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -45,7 +51,6 @@ import com.google.common.io.Files;
 /**
  * This class is to be used at job creation time. It provides the API that lets code
  * register Schemas with pig to be generated. It is necessary to register these Schemas
- * so that the generated code can be made on the client side, and shipped to the mappers
  * and reducers.
  */
 public class SchemaTupleFrontend {
@@ -110,10 +115,12 @@ public class SchemaTupleFrontend {
                 String codePath = codeDir.getAbsolutePath();
                 LOG.info("Distributed cache not supported or needed in local mode. Setting key ["
                         + LOCAL_CODE_DIR + "] with code temp directory: " + codePath);
-                if (pigContext.getExecType() == ExecType.LOCAL) {
                     conf.set(LOCAL_CODE_DIR, codePath);
-                }
                 return;
+            } else {
+                // This let's us avoid NPE in some of the non-traditional pipelines
+                String codePath = codeDir.getAbsolutePath();
+                conf.set(LOCAL_CODE_DIR, codePath);
             }
             DistributedCache.createSymlink(conf); // we will read using symlinks
             StringBuilder serialized = new StringBuilder();
@@ -157,9 +164,9 @@ public class SchemaTupleFrontend {
                 LOG.info("File successfully added to the distributed cache: " + symlink);
             }
             String toSer = serialized.toString();
-            LOG.info("Setting key [" + SchemaTupleBackend.GENERATED_CLASSES_KEY + "] with classes to deserialize [" + toSer + "]");
+            LOG.info("Setting key [" + GENERATED_CLASSES_KEY + "] with classes to deserialize [" + toSer + "]");
             // we must set a key in the job conf so individual jobs know to resolve the shipped classes
-            conf.set(SchemaTupleBackend.GENERATED_CLASSES_KEY, toSer);
+            conf.set(GENERATED_CLASSES_KEY, toSer);
         }
 
         /**
@@ -169,9 +176,8 @@ public class SchemaTupleFrontend {
          */
         private boolean generateAll(Map<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> schemasToGenerate) {
             boolean filesToShip = false;
-            String shouldString = conf.get(SchemaTupleBackend.SHOULD_GENERATE_KEY);
-            if (shouldString == null || !Boolean.parseBoolean(shouldString)) {
-                LOG.info("Key ["+SchemaTupleBackend.SHOULD_GENERATE_KEY+"] is false, aborting generation.");
+            if (!conf.getBoolean(SHOULD_USE_SCHEMA_TUPLE, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
+                LOG.info("Key ["+SHOULD_USE_SCHEMA_TUPLE+"] is false, will not generate code.");
                 return false;
             }
             LOG.info("Generating all registered Schemas.");
@@ -222,6 +228,14 @@ public class SchemaTupleFrontend {
      */
     public static int registerToGenerateIfPossible(Schema udfSchema, boolean isAppendable, GenContext context) {
         if (stf == null) {
+            if (pigContextToReset != null) {
+                Properties prop = pigContextToReset.getProperties();
+                prop.remove(GENERATED_CLASSES_KEY);
+                prop.remove(LOCAL_CODE_DIR);
+                pigContextToReset = null;
+            }
+            SchemaTupleBackend.reset();
+            SchemaTupleClassGenerator.resetGlobalClassIdentifier();
             stf = new SchemaTupleFrontend();
         }
 
@@ -249,12 +263,6 @@ public class SchemaTupleFrontend {
     }
 
     /**
-     * This key is used when a job is run in local mode to pass the location of the generated code
-     * from the frontent to the "backend."
-     */
-    protected static final String LOCAL_CODE_DIR = "pig.schematuple.local.dir";
-
-    /**
      * This must be called when the code has been generated and the generated code needs to be shipped
      * to the cluster, so that it may be used by the mappers and reducers.
      * @param pigContext
@@ -268,5 +276,32 @@ public class SchemaTupleFrontend {
         SchemaTupleFrontendGenHelper stfgh = new SchemaTupleFrontendGenHelper(pigContext, conf);
         stfgh.generateAll(stf.getSchemasToGenerate());
         stfgh.internalCopyAllGeneratedToDistributedCache();
+
+        Properties prop = pigContext.getProperties();
+        String value = conf.get(GENERATED_CLASSES_KEY);
+        if (value != null) {
+            prop.setProperty(GENERATED_CLASSES_KEY, value);
+        } else {
+            prop.remove(GENERATED_CLASSES_KEY);
+        }
+        value = conf.get(LOCAL_CODE_DIR);
+        if (value != null) {
+            prop.setProperty(LOCAL_CODE_DIR, value);
+        } else {
+            prop.remove(LOCAL_CODE_DIR);
+        }
+    }
+
+    private static PigContext pigContextToReset = null;
+
+    /**
+     * This is a method which caches a PigContext object that has had
+     * relevant key values set by SchemaTupleBackend. This is necessary
+     * because in some cases, multiple cycles of jobs might run in the JVM,
+     * but the PigContext object may be shared, so we want to make sure to
+     * undo any changes we have made to it.
+     */
+    protected static void lazyReset(PigContext pigContext) {
+        pigContextToReset = pigContext;
     }
 }

Modified: pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java Fri Oct  5 21:18:23 2012
@@ -253,7 +253,7 @@ public class FindQuantiles extends EvalF
                     probVec.set(l, new Float(0.0));
                 }
                 // for each partition that this sample item is present in,
-                // compute the fraction of the total occurences for that
+                // compute the fraction of the total occurrences for that
                 // partition - this will be the probability with which we
                 // will pick this partition in the final sort reduce job
                 // for this sample item

Modified: pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java Fri Oct  5 21:18:23 2012
@@ -38,7 +38,9 @@ import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 
 /**
@@ -103,6 +105,8 @@ public class ReadToEndLoader extends Loa
      */
     private InputFormat inputFormat = null;
     
+    private PigContext pigContext;
+
     /**
      * @param wrappedLoadFunc
      * @param conf
@@ -120,6 +124,16 @@ public class ReadToEndLoader extends Loa
         init();
     }
     
+    public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
+            String inputLocation, int splitIndex, PigContext pigContext) throws IOException {
+        this.wrappedLoadFunc = wrappedLoadFunc;
+        this.inputLocation = inputLocation;
+        this.conf = conf;
+        this.curSplitIndex = splitIndex;
+        this.pigContext = pigContext;
+        init();
+    }
+
     /**
      * This constructor takes an array of split indexes (toReadSplitIdxs) of the 
      * splits to be read.
@@ -143,9 +157,14 @@ public class ReadToEndLoader extends Loa
     
     @SuppressWarnings("unchecked")
     private void init() throws IOException {
+        if (conf != null && pigContext != null) {
+            SchemaTupleBackend.initialize(conf, pigContext, true);
+        }
+
         // make a copy so that if the underlying InputFormat writes to the
         // conf, we don't affect the caller's copy
         conf = new Configuration(conf);
+
         // let's initialize the wrappedLoadFunc 
         Job job = new Job(conf);
         wrappedLoadFunc.setLocation(inputLocation, 

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Fri Oct  5 21:18:23 2012
@@ -131,7 +131,6 @@ public class LogToPhyTranslationVisitor 
     @Override
     public void visit(LOLoad loLoad) throws FrontendException {
         String scope = DEFAULT_SCOPE;
-        //        System.err.println("Entering Load");
         // The last parameter here is set to true as we assume all files are
         // splittable due to LoadStore Refactor
         POLoad load = new POLoad(new OperatorKey(scope, nodeGen
@@ -160,7 +159,6 @@ public class LogToPhyTranslationVisitor 
                 throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
             }
         }
-        //        System.err.println("Exiting Load");
     }
 
     @Override
@@ -855,8 +853,20 @@ public class LogToPhyTranslationVisitor 
         for(boolean fl: flatten) {
             flattenList.add(fl);
         }
+        LogicalSchema logSchema = foreach.getSchema();
+        Schema schema = null;
+        if (logSchema != null) {
+            try {
+                schema = Schema.getPigSchema(new ResourceSchema(logSchema));
+            } catch (FrontendException e) {
+                throw new RuntimeException("LogicalSchema in foreach unable to be converted to Schema: " + logSchema, e);
+            }
+        }
+        if (schema != null) {
+            SchemaTupleFrontend.registerToGenerateIfPossible(schema, false, GenContext.FOREACH); //TODO may need to be appendable
+        }
         POForEach poFE = new POForEach(new OperatorKey(scope, nodeGen
-                .getNextNodeId(scope)), foreach.getRequestedParallelism(), innerPlans, flattenList);
+                .getNextNodeId(scope)), foreach.getRequestedParallelism(), innerPlans, flattenList, schema);
         poFE.addOriginalLocation(foreach.getAlias(), foreach.getLocation());
         poFE.setResultType(DataType.BAG);
         logToPhyMap.put(foreach, poFE);
@@ -884,6 +894,7 @@ public class LogToPhyTranslationVisitor 
     /**
      * This function takes in a List of LogicalExpressionPlan and converts them to
      * a list of PhysicalPlans
+     *
      * @param plans
      * @return
      * @throws FrontendException

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Fri Oct  5 21:18:23 2012
@@ -820,7 +820,6 @@ public class LogicalPlanBuilder {
                     funcSpec == null ?
                         new FuncSpec(PigStorage.class.getName()) :
                         funcSpec;
-
             loFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(instantiatedFuncSpec);
             String fileNameKey = QueryParserUtils.constructFileNameSignature(filename, instantiatedFuncSpec) + "_" + (loadIndex++);
             absolutePath = fileNameMap.get(fileNameKey);

Modified: pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java (original)
+++ pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java Fri Oct  5 21:18:23 2012
@@ -41,14 +41,13 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
-import org.joda.time.DateTime;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -62,6 +61,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.util.Utils;
+import org.joda.time.DateTime;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -75,13 +75,11 @@ public class TestSchemaTuple {
 
     @Before
     public void perTestInitialize() {
-        SchemaTupleFrontend.reset();
-        SchemaTupleBackend.reset();
-
         props = new Properties();
-        props.setProperty(SchemaTupleBackend.SHOULD_GENERATE_KEY, "true");
+        props.setProperty(PigConfiguration.SHOULD_USE_SCHEMA_TUPLE, "true");
 
         conf = ConfigurationUtil.toConfiguration(props);
+
         pigContext = new PigContext(ExecType.LOCAL, props);
     }
 
@@ -99,7 +97,7 @@ public class TestSchemaTuple {
 
         udfSchema = Utils.getSchemaFromString("a:chararray,(a:chararray)");
         isAppendable = false;
-        context = GenContext.LOAD;
+        context = GenContext.FOREACH;
         SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
 
         udfSchema = Utils.getSchemaFromString("a:int,(a:int,(a:int,(a:int,(a:int,(a:int,(a:int))))))");
@@ -180,11 +178,19 @@ public class TestSchemaTuple {
         udfSchema = Utils.getSchemaFromString("int, m:map[(int,int,int)]");
         SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
 
+        isAppendable = false;
+        udfSchema = new Schema();
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+        isAppendable = false;
+        udfSchema = new Schema(new FieldSchema(null, DataType.BAG));
+        SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
         // this compiles and "ships"
         SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
 
         //backend
-        SchemaTupleBackend.initialize(conf, ExecType.LOCAL);
+        SchemaTupleBackend.initialize(conf, pigContext);
 
         udfSchema = Utils.getSchemaFromString("a:int");
         isAppendable = false;
@@ -209,7 +215,7 @@ public class TestSchemaTuple {
 
         udfSchema = Utils.getSchemaFromString("a:chararray,(a:chararray)");
         isAppendable = false;
-        context = GenContext.LOAD;
+        context = GenContext.FOREACH;
         tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
         putThroughPaces(tf, udfSchema, isAppendable);
 
@@ -302,6 +308,16 @@ public class TestSchemaTuple {
         udfSchema = Utils.getSchemaFromString("int, m:map[(int,int,int)]");
         tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
         putThroughPaces(tf, udfSchema, isAppendable);
+
+        isAppendable = false;
+        udfSchema = new Schema();
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        assertNull(tf);
+
+        isAppendable = false;
+        udfSchema = new Schema(new FieldSchema(null, DataType.BAG));
+        tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+        putThroughPaces(tf, udfSchema, isAppendable);
     }
 
     private void putThroughPaces(SchemaTupleFactory tf, Schema udfSchema, boolean isAppendable) throws Exception {

Modified: pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri Oct  5 21:18:23 2012
@@ -129,8 +129,6 @@ import org.junit.Test;
 
 public class TestBuiltin {
 
-    private String initString = "local";
-
     PigServer pigServer;
 
     // This should only be used when absolutely necessary -- eg, when using ReadToEndLoader.
@@ -304,7 +302,6 @@ public class TestBuiltin {
         expectedMap.put("FloatAvgIntermediate", expectedMap.get("FloatSum"));
 
         // set up input hash
-        try{
             inputMap.put("Integer", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), intInput));
             inputMap.put("IntegerAsLong", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), intAsLong));
             inputMap.put("Long", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), longInput));
@@ -315,11 +312,6 @@ public class TestBuiltin {
             inputMap.put("ByteArrayAsDouble", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), baAsDouble));
             inputMap.put("String", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), stringInput));
 
-        }catch(ExecException e) {
-            e.printStackTrace();
-        }
-
-
         DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.UTC.getOffset(null)));
     }
 

Modified: pig/trunk/test/org/apache/pig/test/TestCommit.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCommit.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCommit.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCommit.java Fri Oct  5 21:18:23 2012
@@ -17,53 +17,28 @@
  */
 package org.apache.pig.test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
-import java.util.StringTokenizer;
-
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
 
-import org.apache.pig.ComparisonFunc;
-import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.builtin.Distinct;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.TextLoader;
-import org.apache.pig.data.*;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.PigFile;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.util.Pair;
-import junit.framework.TestCase;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Before;
+import org.junit.Test;
 
-@RunWith(JUnit4.class)
-public class TestCommit extends TestCase {
+public class TestCommit {
 
     private PigServer pigServer;
 
-    TupleFactory mTf = TupleFactory.getInstance();
+    private static final TupleFactory mTf = TupleFactory.getInstance();
 
     @Before
-    @Override
     public void setUp() throws Exception{
         pigServer = new PigServer(ExecType.LOCAL, new Properties());
     }
@@ -95,11 +70,11 @@ public class TestCommit extends TestCase
             Tuple t = iter.next();
             count++;
             if (count == 1) {
-                assertTrue(t.get(0).equals(expected1.get(0)));
-                assertTrue(t.get(1).equals(expected1.get(1)));
+                assertEquals(t.get(0), expected1.get(0));
+                assertEquals(t.get(1), expected1.get(1));
             } else if (count == 2){
-                assertTrue(t.get(0).equals(expected2.get(0)));
-                assertTrue(t.get(1).equals(expected2.get(1)));
+                assertEquals(t.get(0), expected2.get(0));
+                assertEquals(t.get(1), expected2.get(1));
             }
         }
         assertEquals(count, 2);
@@ -146,6 +121,7 @@ public class TestCommit extends TestCase
         }
         pigServer.deleteFile("testCheckin2-output.txt");
         assertEquals(count, 2);
-        assertTrue(contain1 && contain2);
+        assertTrue(contain1);
+        assertTrue(contain2);
     }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java Fri Oct  5 21:18:23 2012
@@ -18,7 +18,7 @@
 
 package org.apache.pig.test;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -32,35 +32,22 @@ import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.newplan.Operator;
-import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 
 public class TestExampleGenerator {
 
-    
     static PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
 
-   
     static int MAX = 100;
     static String A, B;
     static  File fileA, fileB;
     
-    {
-        try {
-            pigContext.connect();
-        } catch (ExecException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-    }
-
     @BeforeClass
     public static void oneTimeSetup() throws Exception {
-       
+        pigContext.connect();
 
         fileA = File.createTempFile("dataA", ".dat");
         fileB = File.createTempFile("dataB", ".dat");
@@ -100,7 +87,7 @@ public class TestExampleGenerator {
         pigserver.registerQuery(query);
         Map<Operator, DataBag> derivedData = pigserver.getExamples("A");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
 
     }
 
@@ -116,7 +103,7 @@ public class TestExampleGenerator {
         pigserver.registerQuery(query);
         Map<Operator, DataBag> derivedData = pigserver.getExamples("B");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
 
     }
 
@@ -132,7 +119,7 @@ public class TestExampleGenerator {
         pigserver.registerQuery(query);
         Map<Operator, DataBag> derivedData = pigserver.getExamples("B");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
     
     @Test
@@ -153,7 +140,7 @@ public class TestExampleGenerator {
         pigserver.registerQuery(query);
         Map<Operator, DataBag> derivedData = pigserver.getExamples("E");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
 
     }
     
@@ -167,7 +154,7 @@ public class TestExampleGenerator {
 
         Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
     
     //see PIG-2170
@@ -181,7 +168,7 @@ public class TestExampleGenerator {
 
         Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
     
     @Test
@@ -195,7 +182,7 @@ public class TestExampleGenerator {
 
         Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
 
     @Test
@@ -208,7 +195,7 @@ public class TestExampleGenerator {
 
         Map<Operator, DataBag> derivedData = pigServer.getExamples("E");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
 
     @Test
@@ -221,7 +208,7 @@ public class TestExampleGenerator {
 
         Map<Operator, DataBag> derivedData = pigServer.getExamples("E");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
 
     @Test
@@ -233,7 +220,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("C = cogroup A by (x, y), B by (x, y);");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
 
     @Test
@@ -244,7 +231,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("C = cogroup A by x, B by x;");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
 
     @Test
@@ -254,7 +241,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("B = group A by x;");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
 
     }
     
@@ -266,7 +253,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("C = foreach B generate group, COUNT(A);");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
 
     }
 
@@ -279,7 +266,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("D = foreach C generate group, COUNT(B);");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("D");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
 
     }
     
@@ -292,7 +279,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("D = UNION B, C;");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("D");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
 
     }
     
@@ -304,7 +291,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("C = foreach B { FA = filter A by y == 6; generate group, COUNT(FA);};");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
 
     }
 
@@ -316,7 +303,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("C = foreach B { FA = filter A by y == 6; DA = DISTINCT FA; generate group, COUNT(DA);};");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
 
     }
     
@@ -328,7 +315,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("C = union A, B;");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
 
     @Test
@@ -338,7 +325,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("B = DISTINCT A;");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
     
     @Test
@@ -349,7 +336,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("C = CROSS A, B;");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
     
     @Test
@@ -359,7 +346,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("B = limit A 5;");
         Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
     
     //see PIG-2275
@@ -373,7 +360,7 @@ public class TestExampleGenerator {
 
         Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
     
     @Test
@@ -387,7 +374,7 @@ public class TestExampleGenerator {
 
         Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
 
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
 
     @Test
@@ -405,7 +392,7 @@ public class TestExampleGenerator {
         pigServer.registerQuery("store D into '" +  out.getAbsolutePath() + "';");
         Map<Operator, DataBag> derivedData = pigServer.getExamples(null);
     
-        assertTrue(derivedData != null);
+        assertNotNull(derivedData);
     }
 
 }

Modified: pig/trunk/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLoad.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLoad.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLoad.java Fri Oct  5 21:18:23 2012
@@ -17,6 +17,9 @@
  */
 package org.apache.pig.test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -30,8 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import junit.framework.Assert;
-
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigServer;
@@ -55,22 +56,17 @@ import org.apache.pig.parser.ParserExcep
 import org.apache.pig.parser.QueryParserDriver;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.apache.pig.test.utils.TestHelper;
-import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
 
-@RunWith(JUnit4.class)
-public class TestLoad extends junit.framework.TestCase {
+public class TestLoad {
 
     PigContext pc;
     PigServer[] servers;
     
     static MiniCluster cluster = MiniCluster.buildCluster();
     
-    @Override
     @Before
     public void setUp() throws Exception {
         FileLocalizer.deleteTempFiles();
@@ -80,11 +76,6 @@ public class TestLoad extends junit.fram
         };       
     }
         
-    @Override
-    @After
-    public void tearDown() throws Exception {
-    }
-
     @Test
     public void testGetNextTuple() throws IOException {
         pc = servers[0].getPigContext();
@@ -264,7 +255,7 @@ public class TestLoad extends junit.fram
         LogicalPlan lp = Util.buildLp(servers[1], query);
         LOLoad load = (LOLoad) lp.getSources().get(0);
         nonDfsUrl = nonDfsUrl.replaceFirst("/$", "");
-        Assert.assertEquals(nonDfsUrl, load.getFileSpec().getFileName());
+        assertEquals(nonDfsUrl, load.getFileSpec().getFileName());
     }
     
     @SuppressWarnings("unchecked")
@@ -297,7 +288,7 @@ public class TestLoad extends junit.fram
             }
             Collections.sort(expectedBasedOnNumberOfInputs);
             Collections.sort(actual);
-            Assert.assertEquals(expectedBasedOnNumberOfInputs, actual);
+            assertEquals(expectedBasedOnNumberOfInputs, actual);
         } finally {
             for(int i = 0; i < inputFileNames.length; i++) {
                 Util.deleteFile(pc, inputFileNames[i]);
@@ -325,24 +316,24 @@ public class TestLoad extends junit.fram
             
             String query = "a = load '"+orig+"';";
             LogicalPlan lp = builder.parse(query);
-            Assert.assertTrue(lp.size()>0);
+            assertTrue(lp.size()>0);
             Operator op = lp.getSources().get(0);
             
-            Assert.assertTrue(op instanceof LOLoad);
+            assertTrue(op instanceof LOLoad);
             LOLoad load = (LOLoad)op;
     
             String p = load.getFileSpec().getFileName();
             System.err.println("DEBUG: p:" + p + " expected:" + expected +", exectype:" + pc.getExecType());
             if(noConversionExpected) {
-                Assert.assertEquals(p, expected);
+                assertEquals(p, expected);
             } else  {
                 if (pc.getExecType() == ExecType.MAPREDUCE) {
-                    Assert.assertTrue(p.matches(".*hdfs://[0-9a-zA-Z:\\.]*.*"));
-                    Assert.assertEquals(p.replaceAll("hdfs://[0-9a-zA-Z:\\.]*/", "/"),
+                    assertTrue(p.matches(".*hdfs://[0-9a-zA-Z:\\.]*.*"));
+                    assertEquals(p.replaceAll("hdfs://[0-9a-zA-Z:\\.]*/", "/"),
                             expected);
                 } else {
-                    Assert.assertTrue(p.matches(".*file://[0-9a-zA-Z:\\.]*.*"));
-                    Assert.assertEquals(p.replaceAll("file://[0-9a-zA-Z:\\.]*/", "/"),
+                    assertTrue(p.matches(".*file://[0-9a-zA-Z:\\.]*.*"));
+                    assertEquals(p.replaceAll("file://[0-9a-zA-Z:\\.]*/", "/"),
                             expected);
                 }
             }