You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/02/09 23:40:15 UTC

svn commit: r908262 [1/2] - in /hadoop/pig/branches/load-store-redesign: ./ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/execu...

Author: pradeepkth
Date: Tue Feb  9 22:40:13 2010
New Revision: 908262

URL: http://svn.apache.org/viewvc?rev=908262&view=rev
Log:
svn merge -r902253:908177 http://svn.apache.org/repos/asf/hadoop/pig/trunk

Added:
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/InputSchemaUDF.java
Modified:
    hadoop/pig/branches/load-store-redesign/CHANGES.txt
    hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/piglatin_ref2.xml
    hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_pig.xml
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/executionengine/ExecJob.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/grunt/GruntParser.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCollectedGroup.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestDataBag.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocal2.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMRCompiler.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestNullConstant.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestOperatorPlan.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSchema.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStreamingLocal.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Tue Feb  9 22:40:13 2010
@@ -53,6 +53,18 @@
 
 IMPROVEMENTS
 
+PIG-1224: Collected group should change to use new (internal) bag (ashutoshc)
+
+PIG-1046: join algorithm specification is within double quotes (ashutoshc)
+
+PIG-1209: Port POJoinPackage to proactively spill (ashutoshc)
+
+PIG-1190: Handling of quoted strings in pig-latin/grunt commands (ashutoshc)
+
+PIG-1214: Pig 0.6 Docs fixes (chandec via olgan)
+
+PIG-977:  exit status does not account for JOB_STATUS.TERMINATED (ashutoshc)
+
 PIG-1192: Pig 0.6 Docs fixes (chandec via olgan)
 
 PIG-1177: Pig 0.6 Docs - Zebra docs (chandec via olgan)
@@ -107,6 +119,18 @@
 
 BUG FIXES
 
+PIG-1154: Local Mode fails when hadoop config directory is specified in 
+            classpath (ankit.modi via gates)
+
+PIG-1124: Unable to set Custom Job Name using the -Dmapred.job.name parameter (ashutoshc)
+
+PIG-1213: Schema serialization is broken (pradeepkth)
+
+PIG-1194:  ERROR 2055: Received Error while processing the map plan (rding via ashutoshc)
+
+PIG-1204:  Pig hangs when joining two streaming relations in local mode
+(rding)
+
 PIG-1191:  POCast throws exception for certain sequences of LOAD, FILTER,
 			FORACH (pradeepkth via gates)
 
@@ -173,6 +197,14 @@
 PIG-1176: Column Pruner issues in union of loader with and without schema
 (daijy)
 
+PIG-1184: PruneColumns optimization does not handle the case of foreach
+flatten correctly if flattened bag is not used later (daijy)
+
+PIG-1189: StoreFunc UDF should ship to the backend automatically without
+"register" (daijy)
+
+PIG-1212: LogicalPlan.replaceAndAddSucessors produce wrong result when successors are null (daijy)
+
 Release 0.6.0 - Unreleased
 
 INCOMPATIBLE CHANGES
@@ -385,6 +417,11 @@
 
 PIG-1195: POSort should take care of sort order (daijy)
 
+PIG-1210: fieldsToRead send the same fields more than once in some cases (daijy)
+
+PIG-1231: DefaultDataBagIterator.hasNext() should be idempotent in all cases
+(daijy)
+
 Release 0.5.0
 
 INCOMPATIBLE CHANGES

Modified: hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/piglatin_ref2.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/piglatin_ref2.xml?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/piglatin_ref2.xml (original)
+++ hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/piglatin_ref2.xml Tue Feb  9 22:40:13 2010
@@ -8710,7 +8710,7 @@
    <informaltable frame="all">
       <tgroup cols="1"><tbody><row>
             <entry>
-               <para>PigStorage(field_delimiter)        </para>
+               <para>PigStorage(field_delimiter) </para>
             </entry>
          </row></tbody></tgroup>
    </informaltable></section>
@@ -8732,16 +8732,15 @@
    
    <section>
    <title>Usage</title>
-   <para>PigStorage works with structured text files in human-readable UTF-8 format. PigStorage also works with simple and complex data types and is the default function for the LOAD and STORE operators.</para>
-   <itemizedlist>
-      <listitem>
-         <para>For load statements, PigStorage expects data to be formatted as delimiter-separated fields and newline-separated records ('\n'). </para>
-      </listitem>
-      <listitem>
-         <para>For store statements, PigStorage outputs data as delimiter-separated fields and newline-separated records ('\n'). </para>
-      </listitem>
-   </itemizedlist>
-   <para>For both load and store statements the default field delimiter is the tab character ('\t'). You can use other characters as field delimiters, but separators such as ^A or Ctrl-A should be represented in Unicode (\u0001) using UTF-16 encoding (see Wikipedia <ulink url="http://en.wikipedia.org/wiki/ASCII">ASCII</ulink>, <ulink url="http://en.wikipedia.org/wiki/Unicode">Unicode</ulink>, and <ulink url="http://en.wikipedia.org/wiki/UTF-16">UTF-16</ulink>).</para>
+   <para>PigStorage is the default function for the LOAD and STORE operators. PigStorage works with structured text files (in human-readable UTF-8 format) and bzip compressed text files. PigStorage also works with simple and complex data types.</para>
+
+  <para>Load statements – PigStorage expects data to be formatted using field delimiters, either the tab character  ('\t') or other specified character.</para>
+
+   <para>Store statements – PigStorage outputs data using field deliminters, either the tab character  ('\t') or other specified character, and the line feed record delimiter ('\n').  </para>
+
+   <para>Field Deliminters – For load and store statements the default field delimiter is the tab character ('\t'). You can use other characters as field delimiters, but separators such as ^A or Ctrl-A should be represented in Unicode (\u0001) using UTF-16 encoding (see Wikipedia <ulink url="http://en.wikipedia.org/wiki/ASCII">ASCII</ulink>, <ulink url="http://en.wikipedia.org/wiki/Unicode">Unicode</ulink>, and <ulink url="http://en.wikipedia.org/wiki/UTF-16">UTF-16</ulink>).</para>
+   
+   <para>Record Deliminters – For load statements Pig interprets the line feed ( '\n' ), carriage return ( '\r' or CTRL-M) and combined CR + LF ( '\r\n' ) characters as record delimiters (do not use these characters as field delimiters). For store statements Pig uses the line feed ('\n') character as the record delimiter. For load and store statements, if the input file is a bzip file (ending in .bz or .bz2), Pig uses the line feed ('\n') character as the record delimiter.</para>
    </section>
    
    <section>

Modified: hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_pig.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_pig.xml?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_pig.xml (original)
+++ hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_pig.xml Tue Feb  9 22:40:13 2010
@@ -102,8 +102,14 @@
  </section>
 <!--end example: map-side data-->
     
-    
-    
+    <!--sorting data-->
+    <section>
+   <title>Sorting Data</title>
+   <p>
+   Pig allows you to sort data by ascending (ASC) or descending (DESC) order (for more information, see <a href="piglatin_ref2.html#ORDER">ORDER</a>). Currently, Zebra supports tables that are sorted in ascending order. Zebra does not support tables that are sorted in descending order; if Zebra encounters a table to be stored that is sorted in descending order, Zebra will issue a warning and store the table as an unsorted table.</p>
+     </section>
+     <!--end sorting data-->
+     
     <!--example: storing data-->
     <section>
    <title>Storing Data</title>

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/executionengine/ExecJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/executionengine/ExecJob.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/executionengine/ExecJob.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/executionengine/ExecJob.java Tue Feb  9 22:40:13 2010
@@ -20,7 +20,6 @@
 
 import java.util.Iterator;
 import java.util.Properties;
-import java.util.Map;
 import java.io.OutputStream;
 
 import org.apache.pig.data.Tuple;
@@ -35,12 +34,8 @@
 public interface ExecJob {
 
     public enum JOB_STATUS {
-        QUEUED,
-        RUNNING,
-        SUSPENDED,
-        TERMINATED,
         FAILED,
-        COMPLETED,
+        COMPLETED
     }
 
     public static final String PROGRESS_KEY = "job.progress";
@@ -75,7 +70,7 @@
      * 
      * @return configuration information for the execution engine
      */    
-    public Properties getContiguration();
+    public Properties getConfiguration();
 
     /**
      * Can be information about the state (not submitted, e.g. the execute method

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Feb  9 22:40:13 2010
@@ -18,11 +18,13 @@
 
 package org.apache.pig.backend.hadoop.executionengine;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketImplFactory;
+import java.net.URL;
 import java.util.Collection;
 import java.util.List;
 import java.util.ArrayList;
@@ -51,10 +53,10 @@
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -64,6 +66,10 @@
     public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
     private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
     
+    private static final String MAPRED_SITE = "mapred-site.xml";
+    private static final String HDFS_SITE = "hdfs-site.xml";
+    private static final String MAPRED_SYS_DIR = "mapred.system.dir";
+    
     private final Log log = LogFactory.getLog(getClass());
     public static final String LOCAL = "local";
     
@@ -158,6 +164,30 @@
         } else {
             properties.setProperty(JOB_TRACKER_LOCATION, LOCAL );
             properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
+            
+            Configuration testConf = new Configuration();
+            ClassLoader cl = testConf.getClassLoader();
+            URL mapred_site = cl.getResource( MAPRED_SITE );
+            URL hdfs_site = cl.getResource( HDFS_SITE );
+            
+            if( mapred_site != null || hdfs_site != null ) {
+                log.warn( "Passing Hadoop Site Configurations in classpath " +
+                		"is not recommended for Local Mode" );
+            }
+
+            // This is one case. Here we check if mapred.system.dir  
+            // directory is present. This check causes use to print a nice error
+            String newMapredSystemDir = testConf.get( MAPRED_SYS_DIR, "" );
+            Configuration defaultConf = new Configuration(false);
+            defaultConf.addResource("core-default.xml");
+            defaultConf.addResource("mapred-default.xml");
+            if( defaultConf.get(MAPRED_SYS_DIR, "").compareTo(newMapredSystemDir) != 0 ) {
+                File systemDir = new File(newMapredSystemDir);
+                if( ! systemDir.exists() ) {
+                    throw new ExecException( MAPRED_SYS_DIR + ": " + newMapredSystemDir 
+                            + " mentioned in the configuration does not exist");
+                }
+            }
         }
         
         cluster = properties.getProperty(JOB_TRACKER_LOCATION);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Tue Feb  9 22:40:13 2010
@@ -21,7 +21,6 @@
 import java.io.OutputStream;
 import java.io.InputStream;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
@@ -143,7 +142,7 @@
         };
     }
 
-    public Properties getContiguration() {
+    public Properties getConfiguration() {
         Properties props = new Properties();
         return props;
     }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Feb  9 22:40:13 2010
@@ -398,8 +398,12 @@
             conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
             conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
             // this is for unit tests since some don't create PigServer
-            if (pigContext.getProperties().getProperty(PigContext.JOB_NAME) != null)
-                nwJob.setJobName(pigContext.getProperties().getProperty(PigContext.JOB_NAME));
+           
+            // if user specified the job name using -D switch, Pig won't reset the name then.
+            if (System.getProperty("mapred.job.name") == null && 
+                    pigContext.getProperties().getProperty(PigContext.JOB_NAME) != null){
+                nwJob.setJobName(pigContext.getProperties().getProperty(PigContext.JOB_NAME));                
+            }
     
             if (pigContext.getProperties().getProperty(PigContext.JOB_PRIORITY) != null) {
                 // If the job priority was set, attempt to get the corresponding enum value

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Feb  9 22:40:13 2010
@@ -731,6 +731,8 @@
             storeToMapReduceMap.put(op, curMROp);
             nonBlocking(op);
             phyToMROpMap.put(op, curMROp);
+            if (op.getSFile()!=null && op.getSFile().getFuncSpec()!=null)
+                curMROp.UDFs.add(op.getSFile().getFuncSpec().toString());
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + op.getClass().getSimpleName();

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Tue Feb  9 22:40:13 2010
@@ -20,8 +20,10 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -105,7 +107,7 @@
     // Sort order for secondary keys;
     boolean[] secondarySortOrder;
 
-    public List<String> UDFs;
+    public Set<String> UDFs;
     
     // Indicates if a UDF comparator is used
     boolean isUDFComparatorUsed = false;
@@ -142,7 +144,7 @@
         mapPlan = new PhysicalPlan();
         combinePlan = new PhysicalPlan();
         reducePlan = new PhysicalPlan();
-        UDFs = new ArrayList<String>();
+        UDFs = new HashSet<String>();
         nig = NodeIdGenerator.getGenerator();
         scope = k.getScope();
     }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Tue Feb  9 22:40:13 2010
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -31,6 +32,7 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -69,6 +71,8 @@
     
     private Object prevKey = null;
     
+    private boolean useDefaultBag = false;
+    
     public POCollectedGroup(OperatorKey k) {
         this(k, -1, null);
     }
@@ -199,8 +203,24 @@
 
             // the first time, just create a new buffer and continue.
             if (prevKey == null && outputBag == null) {
+                
+                if (PigMapReduce.sJobConf != null) {
+                    String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+                    if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                        useDefaultBag = true;
+                    }
+                }
                 prevKey = curKey;
-                outputBag = BagFactory.getInstance().newDefaultBag();
+                outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag() 
+                // In a very rare case if there is a POStream after this 
+                // POCollectedGroup in the pipeline and is also blocking the pipeline;
+                // constructor argument should be 2. But for one obscure
+                // case we don't want to pay the penalty all the time.
+                        
+                // Additionally, if there is a merge join(on a different key) following POCollectedGroup
+                // default bags should be used. But since we don't allow anything
+                // before Merge Join currently we are good.        
+                        : new InternalCachedBag(1);
                 outputBag.add((Tuple)tup.get(1));
                 continue;
             }
@@ -224,7 +244,8 @@
             res.result = tup2;
                
             prevKey = curKey;
-            outputBag = BagFactory.getInstance().newDefaultBag();
+            outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag() 
+                    : new InternalCachedBag(1);
             outputBag.add((Tuple)tup.get(1));
 
             return res;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java Tue Feb  9 22:40:13 2010
@@ -20,12 +20,15 @@
 import java.util.List;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -42,6 +45,8 @@
     private boolean lastInputTuple = false;
     private static final Tuple t1 = null;
     private static final Result eopResult = new Result(POStatus.STATUS_EOP, null);
+    private boolean firstTime = true;
+    private boolean useDefaultBag = false;
 
     public static final String DEFAULT_CHUNK_SIZE = "1000";
 
@@ -98,6 +103,16 @@
      */
     @Override
     public Result getNext(Tuple t) throws ExecException {
+        
+        if(firstTime){
+            firstTime = false;
+            if (PigMapReduce.sJobConf != null) {
+                String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+                if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                    useDefaultBag = true;
+                }
+            }
+        }
         // if a previous call to foreach.getNext()
         // has still not returned all output, process it
         if (forEach.processingPlan)
@@ -126,7 +141,12 @@
             //Put n-1 inputs into bags
             dbs = new DataBag[numInputs];
             for (int i = 0; i < numInputs; i++) {
-                dbs[i] = mBagFactory.newDefaultBag();
+                dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag() 
+                // In a very rare case if there is a POStream after this 
+                // POJoinPackage in the pipeline and is also blocking the pipeline;
+                // constructor argument should be 2 * numInputs. But for one obscure
+                // case we don't want to pay the penalty all the time.        
+                        : new InternalCachedBag(numInputs);                    
             }
             
             //For each Nullable tuple in the input, put it

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Tue Feb  9 22:40:13 2010
@@ -57,7 +57,9 @@
 
     protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
 
-    transient private Log log = LogFactory.getLog(getClass());
+    private static Log log = LogFactory.getLog(POLocalRearrange.class);
+    
+    private static final Result ERR_RESULT = new Result();
 
     protected List<PhysicalPlan> plans;
     
@@ -251,7 +253,7 @@
     public Result getNext(Tuple t) throws ExecException {
         
         Result inp = null;
-        Result res = null;
+        Result res = ERR_RESULT;
         while (true) {
             inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
@@ -308,9 +310,16 @@
                 case DataType.TUPLE:
                     res = op.getNext(dummyTuple);
                     break;
+                default:
+                    log.error("Invalid result type: " + DataType.findType(op.getResultType()));
+                    break;
                 }
-                if(res.returnStatus!=POStatus.STATUS_OK)
+                
+                // allow null as group by key
+                if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) {
                     return new Result();
+                }
+              
                 resLst.add(res);
             }
             
@@ -349,15 +358,24 @@
                     case DataType.TUPLE:
                         res = op.getNext(dummyTuple);
                         break;
+                    default:
+                        log.error("Invalid result type: " + DataType.findType(op.getResultType()));
+                        break;
                     }
-                    if(res.returnStatus!=POStatus.STATUS_OK)
+                    
+                    // allow null as group by key
+                    if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) {
                         return new Result();
+                    }
+                    
                     secondaryResLst.add(res);
                 }
             }
+            
             // If we are using secondary sort key, our new key is:
-            // (nullable, index, (key, secondary key), value)
-            res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);
+            // (nullable, index, (key, secondary key), value)             
+            res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);            
+            res.returnStatus = POStatus.STATUS_OK;
             
             return res;
         }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Tue Feb  9 22:40:13 2010
@@ -115,6 +115,10 @@
 
     protected static final BagFactory mBagFactory = BagFactory.getInstance();
     protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+    
+    private boolean firstTime = true;
+    
+    private boolean useDefaultBag = false;
 
     public POPackage(OperatorKey k) {
         this(k, -1, null);
@@ -211,6 +215,17 @@
     @Override
     public Result getNext(Tuple t) throws ExecException {
         Tuple res;
+        
+        if(firstTime){
+            firstTime = false;
+            if (PigMapReduce.sJobConf != null) {
+                String bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");
+                if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                    useDefaultBag = true;
+                }
+            }
+        }
+        
         if(distinct) {
             // only set the key which has the whole
             // tuple 
@@ -232,20 +247,14 @@
                 
             } else {
                 // create bag to pull all tuples out of iterator
-                String bagType = null;
-                if (PigMapReduce.sJobConf != null) {
-                       bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");       			
-                   }
-                                
-
-                for (int i = 0; i < numInputs; i++) {        		          	           		
-                    if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
-                           dbs[i] = mBagFactory.newDefaultBag();           			
-                       } else {
-                        dbs[i] = new InternalCachedBag(numInputs);
-                    }
-                }      
-                               
+                for (int i = 0; i < numInputs; i++) {
+                    dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
+                    // In a very rare case if there is a POStream after this 
+                    // POPackage in the pipeline and is also blocking the pipeline;
+                    // constructor argument should be 2 * numInputs. But for one obscure
+                    // case we don't want to pay the penalty all the time.                
+                            : new InternalCachedBag(numInputs);                    
+                }                               
                 //For each indexed tup in the inp, sort them
                 //into their corresponding bags based
                 //on the index

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Tue Feb  9 22:40:13 2010
@@ -40,6 +40,8 @@
 
 public class POStream extends PhysicalOperator {
     private static final long serialVersionUID = 2L;
+    
+    private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, null);
 
     private String executableManagerStr;            // String representing ExecutableManager to use
     transient private ExecutableManager executableManager;    // ExecutableManager to use 
@@ -155,7 +157,7 @@
                     // getNext() in POStream should never be called. So
                     // we don't need to set any flag noting we saw all output
                     // from binary
-                    r.returnStatus = POStatus.STATUS_EOP;
+                    r = EOP_RESULT;
                 }
                 return(r);
             }
@@ -190,7 +192,7 @@
                             // getNext() in POStream should never be called. So
                             // we don't need to set any flag noting we saw all output
                             // from binary
-                            r.returnStatus = POStatus.STATUS_EOP;
+                            r = EOP_RESULT;
                         }
                     }
                     
@@ -204,7 +206,7 @@
                     // So once we send this EOP down, getNext() in POStream
                     // should never be called. So we don't need to set any 
                     // flag noting we saw all output from binary
-                    r.returnStatus = POStatus.STATUS_EOP;
+                    r = EOP_RESULT;
                 }
                 return r;
             } else {
@@ -218,7 +220,7 @@
                     // So we can send an EOP to the successor in
                     // the pipeline and also note this condition
                     // for future calls
-                    r.returnStatus = POStatus.STATUS_EOP;
+                    r = EOP_RESULT;
                     allOutputFromBinaryProcessed  = true;
                 }
                 return r;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java Tue Feb  9 22:40:13 2010
@@ -50,6 +50,8 @@
 
     private static final Log log = LogFactory.getLog(DefaultDataBag.class);
     
+    boolean hasCachedTuple = false;
+    
     public DefaultDataBag() {
         mContents = new ArrayList<Tuple>();
     }
@@ -74,6 +76,7 @@
     }
     
     public Iterator<Tuple> iterator() {
+        hasCachedTuple = false;
         return new DefaultDataBagIterator();
     }
 
@@ -150,9 +153,12 @@
         }
 
         public boolean hasNext() { 
-            // See if we can find a tuple.  If so, buffer it.
+            // Once we call hasNext(), set the flag, so we can call hasNext() repeated without fetching next tuple
+            if (hasCachedTuple)
+                return (mBuf != null);
             mBuf = next();
-            return mBuf != null;
+            hasCachedTuple = true;
+            return (mBuf != null);
         }
 
         public Tuple next() {
@@ -161,9 +167,9 @@
             if ((mCntr++ & 0x3ff) == 0) reportProgress();
 
             // If there's one in the buffer, use that one.
-            if (mBuf != null) {
+            if (hasCachedTuple) {
                 Tuple t = mBuf;
-                mBuf = null;
+                hasCachedTuple = false;
                 return t;
             }
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOForEach.java Tue Feb  9 22:40:13 2010
@@ -803,28 +803,23 @@
         if (mSchema == null)
             return null;
         
-        if (mSchema.size()<=column)
-        {
-            return null;
-        }
-        
         return mSchemaPlanMapping.get(column);
     }
     
     public boolean isInputFlattened(int column) throws FrontendException {
-        LogicalPlan plan = getRelevantPlan(column);
-        if (plan==null) {
-            int errCode = 2195;
-            throw new FrontendException("Fail to get foreach plan for input column "+column,
-                    errCode, PigException.BUG);
-        }
-        int index = mForEachPlans.indexOf(plan);
-        if (index==-1) {
-            int errCode = 2195;
-            throw new FrontendException("Fail to get foreach plan for input column "+column,
-                    errCode, PigException.BUG);
+        for (int i=0;i<mForEachPlans.size();i++) {
+            LogicalPlan forEachPlan = mForEachPlans.get(i);
+            TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(forEachPlan);
+            projectFinder.visit();
+            for (LOProject project : projectFinder.getProjectList()) {
+                if (project.getCol()==column) {
+                    if (mFlatten.get(i))
+                        return true;
+                }
+            }
         }
-        return mFlatten.get(index);
+
+        return false;
     }
     
     @Override
@@ -866,7 +861,8 @@
         ArrayList<Pair<Integer, Integer>> inputList = new ArrayList<Pair<Integer, Integer>>();
         for (LOProject project : projectFinder.getProjectSet()) {
             for (int inputColumn : project.getProjection()) {
-                inputList.add(new Pair<Integer, Integer>(0, inputColumn));
+                if (!inputList.contains(new Pair<Integer, Integer>(0, inputColumn)))
+                    inputList.add(new Pair<Integer, Integer>(0, inputColumn));
             }
         }
         if (inputList.size()==0)
@@ -942,16 +938,16 @@
             {
                 continue;
             }
-            boolean allPruned = true;
+            boolean anyPruned = false;
             for (LOProject loProject : projectFinder.getProjectSet()) {
                 Pair<Integer, Integer> pair = new Pair<Integer, Integer>(0,
                         loProject.getCol());
-                if (!columns.contains(pair)) {
-                    allPruned = false;
+                if (columns.contains(pair)) {
+                    anyPruned = true;
                     break;
                 }
             }
-            if (allPruned) {
+            if (anyPruned) {
                 planToRemove.add(i);
             }
         }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java Tue Feb  9 22:40:13 2010
@@ -366,20 +366,40 @@
                     }
                 }
             }
-
-                
+            
             // Merge with required input fields of this logical operator.
             // RequiredInputFields come from two sources, one is mapping from required output to input, 
             // the other is from the operator itself. Here we use getRequiredFields to get the second part,
             // and merge with the first part
             List<RequiredFields> requiredFieldsListOfLOOp;
             
-            // For LOForEach, requiredFields is not really required fields. Here required fields means the input
-            // fields required by the entire output columns, such as filter condition in LOFilter, group columns in LOCoGroup.
-            // For LOForEach, output columns are generated by the foreach plan it belongs to, there is nothing globally required.
-            // So we need to fix the semantic gap here. If the operator is LOForEach, requiredFields is null.
+            // For LOForEach, requiredFields all flattened fields. Even the flattened fields get pruned, 
+            // it may expand the number of rows in the result. So flattened fields shall not be pruned.
+            // LOForEach.getRequiredFields does not give the required fields. RequiredFields means that field
+            // is required by all the outputs. The pipeline does not work correctly without that field. 
+            // LOForEach.getRequiredFields give all the input fields referred in the LOForEach statement, but those
+            // fields can still be pruned (which means, not required)
+            // Eg:
+            // B = foreach A generate a0, a1, a2+a3;
+            // LOForEach.getRequiredFields gives (a0, a1, a2, a3);
+            // However, a2,a3 can be pruned if we do not need the a2+a3 for LOForEach.
+            // So here, we do not use LOForEach.getRequiredFields, instead, any flattened fields are required fields
+            if (rlo instanceof LOForEach) {
+                List<Pair<Integer, Integer>> flattenedInputs = new ArrayList<Pair<Integer, Integer>>();
+                for (int i=0;i<rlo.getSchema().size();i++) {
+                    if (((LOForEach)rlo).isInputFlattened(i)) {
+                        flattenedInputs.add(new Pair<Integer, Integer>(0, i));
+                    }
+                }
+                if (!flattenedInputs.isEmpty()) {
+                    requiredFieldsListOfLOOp = new ArrayList<RequiredFields>();
+                    requiredFieldsListOfLOOp.add(new RequiredFields(flattenedInputs));
+                }
+                else
+                    requiredFieldsListOfLOOp = null;
+            }
             // For LOCross/LOUnion, actually we do not require any field here
-            if (rlo instanceof LOForEach || rlo instanceof LOCross || rlo instanceof LOUnion)
+            else if (rlo instanceof LOCross || rlo instanceof LOUnion)
                 requiredFieldsListOfLOOp = null;
             else
                 requiredFieldsListOfLOOp = rlo.getRequiredFields();
@@ -792,5 +812,5 @@
             String msg = "Unable to prune plan";
             throw new OptimizerException(msg, errCode, PigException.BUG, e);
         }
-    }
+    }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Feb  9 22:40:13 2010
@@ -237,6 +237,31 @@
 		return cogroup;
 	}
 	
+    private LogicalOperator parseUsingForGroupBy(String modifier, ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
+
+      if(modifier.equalsIgnoreCase("collected")){
+            if (gis.size() != 1) {
+                throw new ParseException("Collected group is only supported for single input");  
+                }
+            if (!isColumnProjectionsOrStar(gis.get(0))) {
+                throw new ParseException("Collected group is only supported for columns or star projection");
+                }
+            LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.COLLECTED);
+            cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
+            return cogroup;
+        }
+
+        else if (modifier.equalsIgnoreCase("regular")){
+            LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
+            cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
+            return cogroup;
+        }
+
+        else{
+            throw new ParseException("Only COLLECTED or REGULAR are valid GROUP modifiers.");
+        }
+    }
+    
 	/**
 	 * Join parser. 
 	 */
@@ -352,7 +377,42 @@
 		return foreach;
 	}
 
-	void assertAtomic(LogicalOperator spec, boolean desiredAtomic) throws ParseException{
+    private LogicalOperator parseUsingForJoin(String modifier, ArrayList<CogroupInput> gis,
+                LogicalPlan lp, boolean isFullOuter, boolean isRightOuter, boolean isOuter) throws
+                ParseException, PlanException{
+
+              if (modifier.equalsIgnoreCase("repl") || modifier.equalsIgnoreCase("replicated")) {
+              if(isFullOuter || isRightOuter) {
+                  throw new ParseException("Replicated join does not support (right|full) outer joins");
+              }
+                    LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED); 
+                    joinOp.pinOption(LOJoin.OPTION_JOIN);
+                    return joinOp; 
+            }
+             else if (modifier.equalsIgnoreCase("hash") || modifier.equalsIgnoreCase("default")) {
+                    LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
+                    joinOp.pinOption(LOJoin.OPTION_JOIN);
+                    return joinOp;
+            }
+            else if (modifier.equalsIgnoreCase("skewed")) {
+                    LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED);
+                    joinOp.pinOption(LOJoin.OPTION_JOIN);
+                    return joinOp;
+            }
+             else if (modifier.equalsIgnoreCase("merge")) {
+                 if(isOuter) {
+                        throw new ParseException("Merge join does not support (left|right|full) outer joins");
+                    }
+                    LogicalOperator joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE);
+                    joinOp.pinOption(LOJoin.OPTION_JOIN);
+                    return joinOp; 
+            }
+            else{
+                    throw new ParseException("Only REPL, REPLICATED, HASH, SKEWED and MERGE are vaild JOIN modifiers.");
+            }
+    }
+
+    void assertAtomic(LogicalOperator spec, boolean desiredAtomic) throws ParseException{
 		Boolean isAtomic = null;
 		if ( spec instanceof LOConst || 
 			(spec instanceof LOUserFunc &&
@@ -1658,29 +1718,22 @@
     ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); 
     LogicalOperator cogroup = null; 
     log.trace("Entering CoGroupClause");
+    Token t;
 }
 {
-
     (gi = GroupItem(lp) { gis.add(gi); }
         ("," gi = GroupItem(lp) { gis.add(gi); })*
-        (
-            [<USING> ("\"collected\"" { 
-                if (gis.size() != 1) {
-                    throw new ParseException("Collected group is only supported for single input");  
-                }
-                if (!isColumnProjectionsOrStar(gis.get(0))) {
-                    throw new ParseException("Collected group is only supported for columns or star projection");
-                }
-                cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.COLLECTED);
-                cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
-                }
-                |"\"regular\"" {
-                    cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
-                    cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
-                }
-                )
-            ]                                                                        
-        )
+        ([ <USING> (
+          (t = < QUOTEDSTRING> { cogroup = parseUsingForGroupBy(unquote (t.image), gis, lp); })
+         |("\"collected\"") {
+            log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes."); 
+            cogroup = parseUsingForGroupBy("collected", gis, lp);
+            }
+         |("\"regular\"") {
+            log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes."); 
+            cogroup = parseUsingForGroupBy("regular", gis, lp);
+            }
+        )])
     )
 
     {
@@ -1978,6 +2031,7 @@
 	boolean isRightOuter = false;
 	boolean isFullOuter = false;
 	boolean isOuter = false;
+	Token t;
 }
 {
 	(gi = JoinItem(lp) { gis.add(gi); }
@@ -2026,43 +2080,25 @@
 		
 	}
 	// For all types of join we create LOJoin and mark what type of join it is.
-	(
-		[<USING> ("\"replicated\"" { 
-	          if(isFullOuter || isRightOuter) {
-	              throw new ParseException("Replicated join does not support (right|full) outer joins");
-	          }
-				    joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED); 
-				    joinOp.pinOption(LOJoin.OPTION_JOIN); 
-			    }
-			| "\"repl\"" {
-                  if(isFullOuter || isRightOuter) {
-	                    throw new ParseException("Replicated join does not support (right|full) outer joins");
-	          }
-				    joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED);
-				    joinOp.pinOption(LOJoin.OPTION_JOIN);
-                  }
-		    |"\"skewed\"" {
-		    	    joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED);
-		    	    joinOp.pinOption(LOJoin.OPTION_JOIN);
+	([<USING> (
+		  (t = <QUOTEDSTRING> { joinOp = parseUsingForJoin(unquote(t.image), gis, lp, isFullOuter, isRightOuter, isOuter);})
+        | ("\"repl\"" | "\"replicated\"")  {
+		      log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes."); 
+              joinOp = parseUsingForJoin("replicated", gis, lp, isFullOuter, isRightOuter, isOuter);
+		  }
+	    | ("\"skewed\"") {
+              log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes."); 
+              joinOp = parseUsingForJoin("skewed", gis, lp, isFullOuter, isRightOuter, isOuter);
 		    	}
-		    |"\"merge\"" { 
-		    	    if(isOuter) {
-                        throw new ParseException("Merge join does not support (left|right|full) outer joins");
-                    }
-		    	    joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE);
-		    	    joinOp.pinOption(LOJoin.OPTION_JOIN); 
-		    	}
-		    |"\"hash\"" {
-		    		joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
-		    		joinOp.pinOption(LOJoin.OPTION_JOIN);
+		| ("\"merge\"") { 
+            log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes."); 
+            joinOp = parseUsingForJoin("merge", gis, lp, isFullOuter, isRightOuter, isOuter);
+        	}
+	    | ("\"hash\"" | "\"default\"") {
+		    log.info("[WARN] Use of double-quotes for specifying join algorithm is deprecated. Please use single quotes."); 
+            joinOp = parseUsingForJoin("hash", gis, lp, isFullOuter, isRightOuter, isOuter);
 		    	}
-		    |"\"default\"" {
-		    		joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
-		    		joinOp.pinOption(LOJoin.OPTION_JOIN);
-		    	})
-	    ] 
-    )
-    )
+     )]))
 
 	{log.trace("Exiting JoinClause");
 	if (joinOp!=null) {

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Tue Feb  9 22:40:13 2010
@@ -101,7 +101,10 @@
          * logical operator, and the value is the canonical name
          * associated with the field for that operator.
          */
-        private Map<String, LogicalOperator> canonicalMap = null;
+        // marking transient since this data structure is only used in front-end
+        // query planning to figure out lineage for casts on bytearrays and need
+        // not be serialized to the backend
+        transient private Map<String, LogicalOperator> canonicalMap = null;
 
         /**
          * A reverse lookup of canonical names to logical operators. The reverse
@@ -109,7 +112,10 @@
          * cannot be determined. In such cases the keys of the reverse lookup
          * can be used to navigate the plan
          */
-        private MultiMap<LogicalOperator, String> reverseCanonicalMap = null;
+        // marking transient since this data structure is only used in front-end
+        // query planning to figure out lineage for casts on bytearrays and need
+        // not be serialized to the backend
+        transient private MultiMap<LogicalOperator, String> reverseCanonicalMap = null;
         
         /**
          * Canonical namer object to generate new canonical names on

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java Tue Feb  9 22:40:13 2010
@@ -864,7 +864,9 @@
            }
        }
        mFromEdges.removeKey(node);
-       mFromEdges.put(node,newSuccessors);
+       if (!newSuccessors.isEmpty()) {
+           mFromEdges.put(node,newSuccessors);
+       }
     }    
 
     // removes entry  for predecessor in list of predecessors of node, 
@@ -885,7 +887,9 @@
            }
        }
        mToEdges.removeKey(node);
-       mToEdges.put(node,newPredecessors);
+       if (!newPredecessors.isEmpty()) {
+           mToEdges.put(node,newPredecessors);
+       }
     }
     
     /**

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java Tue Feb  9 22:40:13 2010
@@ -32,6 +32,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Vector;
 import java.util.jar.JarEntry;
 import java.util.jar.JarInputStream;
@@ -96,7 +97,7 @@
      * @throws ClassNotFoundException
      * @throws IOException
      */
-    public static void createJar(OutputStream os, List<String> funcs, PigContext pigContext) throws ClassNotFoundException, IOException {
+    public static void createJar(OutputStream os, Set<String> funcs, PigContext pigContext) throws ClassNotFoundException, IOException {
         Vector<JarListEntry> jarList = new Vector<JarListEntry>();
         for(String toSend: pigPackagesToSend) {
             addContainingJar(jarList, PigMapReduce.class, toSend, pigContext);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/grunt/GruntParser.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/grunt/GruntParser.java Tue Feb  9 22:40:13 2010
@@ -411,9 +411,9 @@
     protected void processSet(String key, String value) throws IOException, ParseException {
         if (key.equals("debug"))
         {
-            if (value.equals("on") || value.equals("'on'"))
+            if (value.equals("on"))
                 mPigServer.debugOn();
-            else if (value.equals("off") || value.equals("'off'"))
+            else if (value.equals("off"))
                 mPigServer.debugOff();
             else
                 throw new ParseException("Invalid value " + value + " provided for " + key);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Tue Feb  9 22:40:13 2010
@@ -473,7 +473,7 @@
 	|
 	<REGISTER>
 	t1 = GetPath()
-	{processRegister(t1.image);}
+	{processRegister(unquote(t1.image));}
 	|
 	Script()
 	|
@@ -496,7 +496,7 @@
 	(
 		t1 = GetKey()
 		t2 = GetValue()
-		{processSet(t1.image, t2.image);}
+		{processSet(t1.image, unquote(t2.image));}
 	)
 	|
 	<EOF>

Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/InputSchemaUDF.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/InputSchemaUDF.java?rev=908262&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/InputSchemaUDF.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/InputSchemaUDF.java Tue Feb  9 22:40:13 2010
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ *
+ */
+public class InputSchemaUDF extends EvalFunc<String>{
+
+    @Override
+    public String exec(Tuple input) throws IOException {
+        Schema sch = (Schema)UDFContext.getUDFContext().getUDFProperties(this.getClass()).get("myschema");
+        return sch.toString();
+    }
+    
+    @Override
+    public Schema outputSchema(Schema input) {
+        Properties props = UDFContext.getUDFContext().getUDFProperties(this.getClass());
+        props.put("myschema", input);
+        return new Schema(new Schema.FieldSchema(null, DataType.INTEGER));
+    }
+
+}

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCollectedGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCollectedGroup.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCollectedGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCollectedGroup.java Tue Feb  9 22:40:13 2010
@@ -32,9 +32,12 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.test.utils.LogicalPlanTester;
 import org.apache.pig.test.utils.TestHelper;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.junit.After;
 import org.junit.Before;
@@ -76,6 +79,22 @@
         Util.deleteFile(cluster, INPUT_FILE);
     }
     
+    public void testCollectedGrpSpecifiedInSingleQuotes1(){
+        
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+        LogicalPlan lp = lpt.buildPlan("B = group A by id using 'collected';");
+        assertEquals(LOCogroup.GROUPTYPE.COLLECTED, ((LOCogroup)lp.getLeaves().get(0)).getGroupType());
+    }
+    
+    public void testCollectedGrpSpecifiedInSingleQuotes2(){
+        
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+        LogicalPlan lp = lpt.buildPlan("B = group A all using 'regular';");
+        assertEquals(LOCogroup.GROUPTYPE.REGULAR, ((LOCogroup)lp.getLeaves().get(0)).getGroupType());
+    }
+    
     public void testPOMapsideGroupNoNullPlans() throws IOException {
         POCollectedGroup pmg = new POCollectedGroup(new OperatorKey());
         List<PhysicalPlan> plans = pmg.getPlans();

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestDataBag.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestDataBag.java Tue Feb  9 22:40:13 2010
@@ -1082,6 +1082,40 @@
         }
         assertEquals(bg6, bg7);
     }
+    
+    // See PIG-1231
+    @Test
+    public void testDataBagIterIdempotent() throws Exception {
+        DataBag bg0 = new DefaultDataBag();
+        processDataBag(bg0, true);
+        
+        DataBag bg1 = new DistinctDataBag();
+        processDataBag(bg1, true);
+        
+        DataBag bg2 = new InternalDistinctBag();
+        processDataBag(bg2, true);
+        
+        DataBag bg3 = new InternalSortedBag();
+        processDataBag(bg3, true);
+        
+        DataBag bg4 = new SortedDataBag(null);
+        processDataBag(bg4, true);
+        
+        DataBag bg5 = new InternalCachedBag(0, 0);
+        processDataBag(bg5, false);
+    }
+    
+    void processDataBag(DataBag bg, boolean doSpill) {
+        Tuple t = TupleFactory.getInstance().newTuple(new Integer(0));
+        bg.add(t);
+        if (doSpill)
+            bg.spill();
+        Iterator<Tuple> iter = bg.iterator();
+        assertTrue(iter.hasNext());
+        iter.next();
+        assertFalse(iter.hasNext());
+        assertFalse("hasNext should be idempotent", iter.hasNext());        
+    }
 }
 
 

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java Tue Feb  9 22:40:13 2010
@@ -789,5 +789,51 @@
         Grunt grunt = new Grunt(new BufferedReader(reader), context);
 
         grunt.exec();
+        assertEquals("high", context.getProperties().getProperty(PigContext.JOB_PRIORITY));
+    }
+    
+    public void testSetWithQuotes() throws Throwable {
+        PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigContext context = server.getPigContext();
+
+        String strCmd = "set job.priority 'high'\n";
+
+        ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+        InputStreamReader reader = new InputStreamReader(cmd);
+
+        Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+        grunt.exec();
+        assertEquals("high", context.getProperties().getProperty(PigContext.JOB_PRIORITY));
+    }
+    
+    public void testRegisterWithQuotes() throws Throwable {
+        PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigContext context = server.getPigContext();
+
+        String strCmd = "register 'pig.jar'\n";
+
+        ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+        InputStreamReader reader = new InputStreamReader(cmd);
+
+        Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+        grunt.exec();
+        assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig.jar")));
+    }
+    
+    public void testRegisterWithoutQuotes() throws Throwable {
+        PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigContext context = server.getPigContext();
+
+        String strCmd = "register pig.jar\n";
+
+        ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+        InputStreamReader reader = new InputStreamReader(cmd);
+
+        Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+        grunt.exec();
+        assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig.jar")));
     }
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java Tue Feb  9 22:40:13 2010
@@ -32,10 +32,12 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LOJoin;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LOJoin.JOINTYPE;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.test.utils.Identity;
 import org.apache.pig.test.utils.LogicalPlanTester;
 import org.junit.Before;
 import org.junit.Test;
@@ -95,7 +97,6 @@
         }
     }
 
-    
     @Test
     public void testJoinUnkownSchema() throws Exception {
         // If any of the input schema is unknown, the resulting schema should be unknown as well
@@ -109,7 +110,7 @@
             assertTrue(schema == null);
         }
     }
-    
+
     @Test
     public void testDefaultJoin() throws IOException, ParseException {
         for (ExecType execType : execTypes) {
@@ -553,5 +554,54 @@
             deleteInputFile(execType, secondInput);
         }
     }
-
+    
+    @Test
+    public void testLiteralsForJoinAlgoSpecification1() {
+        
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'A'; ");
+        lpt.buildPlan("b = load 'B'; ");
+        LogicalPlan lp = lpt.buildPlan("c = Join a by $0, b by $0 using 'merge'; ");
+        assertEquals(JOINTYPE.MERGE, ((LOJoin)lp.getLeaves().get(0)).getJoinType());
+    }
+    
+    @Test
+    public void testLiteralsForJoinAlgoSpecification2() {
+        
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'A'; ");
+        lpt.buildPlan("b = load 'B'; ");
+        LogicalPlan lp = lpt.buildPlan("c = Join a by $0, b by $0 using 'hash'; ");
+        assertEquals(JOINTYPE.HASH, ((LOJoin)lp.getLeaves().get(0)).getJoinType());
+    }
+    
+    @Test
+    public void testLiteralsForJoinAlgoSpecification5() {
+        
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'A'; ");
+        lpt.buildPlan("b = load 'B'; ");
+        LogicalPlan lp = lpt.buildPlan("c = Join a by $0, b by $0 using 'default'; ");
+        assertEquals(JOINTYPE.HASH, ((LOJoin)lp.getLeaves().get(0)).getJoinType());
+    }
+    
+    @Test
+    public void testLiteralsForJoinAlgoSpecification3() {
+        
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'A'; ");
+        lpt.buildPlan("b = load 'B'; ");
+        LogicalPlan lp = lpt.buildPlan("c = Join a by $0, b by $0 using 'repl'; ");
+        assertEquals(JOINTYPE.REPLICATED, ((LOJoin)lp.getLeaves().get(0)).getJoinType());
+    }
+    
+    @Test
+    public void testLiteralsForJoinAlgoSpecification4() {
+        
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'A'; ");
+        lpt.buildPlan("b = load 'B'; ");
+        LogicalPlan lp = lpt.buildPlan("c = Join a by $0, b by $0 using 'replicated'; ");
+        assertEquals(JOINTYPE.REPLICATED, ((LOJoin)lp.getLeaves().get(0)).getJoinType());
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocal2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocal2.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocal2.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocal2.java Tue Feb  9 22:40:13 2010
@@ -228,6 +228,44 @@
         Assert.assertEquals(count, actualCount);
     }
 
+    @Test
+    public void testLocalInit() throws Exception {
+        File pigFile = new File("script.pig");
+        File siteFile = new File("mapred-site.xml");
+        try {
+            pigFile.createNewFile();
+            int status,status2;
+            status = Util.executeShellCommand("java -cp "+ 
+                    System.getProperty("java.class.path") + 
+                    "  org.apache.pig.Main -x local " + pigFile.getAbsolutePath() );
+
+            String contents = "<?xml version=\"1.0\"?>\n" +
+            "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n" +
+            "<configuration xmlns:xi=\"http://www.w3.org/2001/XInclude\">\n" +
+            "  <property>\n" +
+            "    <name>mapred.system.dir</name>\n" +
+            "    <value>/mapredsystem/hadoop/mapredsystem</value>\n" +
+            "    <description>No description</description>\n" +
+            "    <final>true</final>\n" +
+            "  </property>\n" +
+            "</configuration>\n";
+            assertTrue( siteFile.createNewFile() );
+            PrintStream ps = new PrintStream(siteFile);
+            ps.print(contents);
+            ps.close();
+            status2 = Util.executeShellCommand("java -cp "+ 
+                    System.getProperty("java.class.path") + 
+                    "  org.apache.pig.Main -x local " + pigFile.getAbsolutePath() );
+            assertEquals( "Without a mapred-site.xml pig should just run", 0, status );
+            assertEquals( "With map.system.dir redefined in mapred-site.xml pig " +
+                    "should exit", 2, status2 );
+        } finally {
+            if( siteFile.exists() ) 
+                siteFile.delete();
+            if( pigFile.exists() )
+                pigFile.delete();
+        }
+    }
 
     /***
      * For generating a sample dataset
@@ -276,5 +314,4 @@
         return TestHelper.createTempFile(data) ;
     }
 
-
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java Tue Feb  9 22:40:13 2010
@@ -17,11 +17,21 @@
  */
 package org.apache.pig.test;
 
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 
+import junit.framework.Assert;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultTuple;
@@ -173,4 +183,52 @@
         assertEquals(db.size(), size);
     }
 
+    @Test
+    public void testMultiQueryJiraPig1194() {
+
+        // test case: POLocalRearrange doesn't handle nulls returned by POBinCond 
+        
+        String INPUT_FILE = "data.txt";
+        
+        final MiniCluster cluster = MiniCluster.buildCluster();
+        
+        try {
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("10\t2\t3");
+            w.println("20\t3\t");
+            w.close();
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+            PigServer myPig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+            myPig.registerQuery("data = load '" + INPUT_FILE + "' as (a0, a1, a2);");
+            myPig.registerQuery("grp = GROUP data BY (((double) a2)/((double) a1) > .001 OR a0 < 11 ? a0 : -1);");
+            
+            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "(10,{(10,2,3)})",
+                            "(null,{(20,3,null)})"
+                    });
+            
+            Iterator<Tuple> iter = myPig.openIterator("grp");
+            int counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            }
+            assertEquals(expectedResults.size(), counter);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+    
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMRCompiler.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMRCompiler.java Tue Feb  9 22:40:13 2010
@@ -914,8 +914,9 @@
         MapReduceOper mrOper = mrPlan.getRoots().get(0);
         
         assertTrue(mrOper.UDFs.size()==2);
-        assertTrue(mrOper.UDFs.get(0).equals("BinStorage"));
-        assertTrue(mrOper.UDFs.get(1).equals("org.apache.pig.builtin.PigStorage"));
+        assertTrue(mrOper.UDFs.size()==2);
+        assertTrue(mrOper.UDFs.contains("BinStorage"));
+        assertTrue(mrOper.UDFs.contains("org.apache.pig.builtin.PigStorage"));
     }
 
     @Test

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestNullConstant.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestNullConstant.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestNullConstant.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestNullConstant.java Tue Feb  9 22:40:13 2010
@@ -175,7 +175,6 @@
         pigServer.registerQuery("b = foreach a generate {(null)}, ['2'#null];");
         Iterator<Tuple> it = pigServer.openIterator("b");
         Tuple t = it.next();
-System.out.println("tuple: " + t);
         assertEquals(null, ((DataBag)t.get(0)).iterator().next().get(0));
         assertEquals(null, ((Map<String, Object>)t.get(1)).get("2"));
         Util.deleteFile(cluster, inputFileName);

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestOperatorPlan.java?rev=908262&r1=908261&r2=908262&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestOperatorPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestOperatorPlan.java Tue Feb  9 22:40:13 2010
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.test;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -2948,5 +2950,38 @@
         
     }
 
+    // See PIG-1212
+    @Test
+    public void testPushBefore2() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[6];
+        
+        ops[0] = new SingleOperator("Load0");
+        ops[1] = new SingleOperator("Load1");
+        ops[2] = new SingleOperator("ForEach0");
+        ops[3] = new SingleOperator("ForEach1");
+        ops[4] = new MultiOperator("Join");
+        ops[5] = new SingleOperator("Filter");
+        
+        for (int i=0;i<6;i++)
+        	plan.add(ops[i]);
+        
+        plan.connect(ops[0], ops[2]);
+        plan.connect(ops[1], ops[3]);
+        plan.connect(ops[2], ops[4]);
+        plan.connect(ops[3], ops[4]);
+        plan.connect(ops[4], ops[5]);
+        
+        try {
+            plan.pushBefore(ops[4], ops[5], 0);
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            PrintStream ps = new PrintStream(baos);
+            PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(ps, plan);
+            planPrinter.visit();
+            assertFalse(baos.toString().equals(""));
+        } catch (PlanException pe) {
+        }        
+    }
+
 }