You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/04/07 23:14:52 UTC

svn commit: r645697 - in /incubator/pig/branches/types: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/impl/ src/org/apache/pig/impl/io/ src/org/apache/pig/impl/physicalLayer/plans/ src/org/apache/pig/impl/...

Author: gates
Date: Mon Apr  7 14:14:48 2008
New Revision: 645697

URL: http://svn.apache.org/viewvc?rev=645697&view=rev
Log:
Shravan's incr3 patch, with new POLoad and POStore operators.


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Mon Apr  7 14:14:48 2008
@@ -134,13 +134,16 @@
     <target name="compile-sources">
         <javac encoding="${build.encoding}" srcdir="${sources}"
             includes="**/plan/*.java, **/data/*.java, **/pig/builtin/*.java,
-                **/test/TestOperatorPlan.java, **/test/TestBuiltin.java,
+                **/test/utils/*.java, **/test/TestOperatorPlan.java, **/test/TestBuiltin.java,
                 **/test/TestConstExpr.java, **/test/TestFilter.java, **/test/TestPhyOp.java,
                 **/test/TestAdd.java, **/test/TestSubtract.java, **/test/TestMultiply.java,
                 **/test/TestDivide.java, **/test/TestMod.java, **/test/TestGreaterThan.java,
         	    **/test/TestGTOrEqual.java,**/test/TestLessThan.java,**/test/TestLTOrEqual.java,
         	    **/test/TestEqualTo.java,**/test/TestNotEqualTo.java, **/test/TestPOGenerate.java,
-                **/test/TestProject.java, **/test/utils/*.java, **/logicalLayer/*.java,
+                **/test/TestProject.java, **/test/TestLoad.java, **/test/TestStore.java,
+                 **/test/FakeFSOutputStream.java,
+                **/test/FakeFSInputStream.java, **/test/Util.java,
+                **/logicalLayer/*.java,
                 **/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
                 **/physicalLayer/topLevelOperator/**/*.java, **/physicalLayer/plans/*.java,
 				**/physicalLayer/Result.java, **/physicalLayer/POStatus.java"
@@ -244,6 +247,8 @@
                 	<include name="**/TestEqualTo.java" />
                 	<include name="**/TestNotEqualTo.java" />
                 	<include name="**/TestPOGenerate.java" />
+                	<include name="**/TestLoad.java" />
+                	<include name="**/TestStore.java" />
                     <!--
                     <include name="**/*Test*.java" />
                     <exclude name="**/TestLargeFile.java" />

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Mon Apr  7 14:14:48 2008
@@ -34,23 +34,20 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.DataStorageException;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
+//TODO FIX Need to uncomment this with the right imports
+//import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
-import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.util.WrappedIOException;
 
 
@@ -233,11 +230,12 @@
         // TODO FIX Need to change so that only syntax parsing is done here, and so that logical plan is additive.
         // parse the query into a logical plan
         LogicalPlan lp = null;
-        try {
+//      TODO FIX Need to uncomment this with the right logic
+        /*try {
             lp = (new LogicalPlanBuilder(pigContext).parse(scope, query, aliases, opTable));
         } catch (ParseException e) {
             throw (IOException) new IOException(e.getMessage()).initCause(e);
-        }
+        }*/
         
         /*
         if (lp.getAlias() != null) {
@@ -469,7 +467,9 @@
     }
     
     public long totalHadoopTimeSpent() {
-        return MapReduceLauncher.totalHadoopTimeSpent;
+//      TODO FIX Need to uncomment this with the right logic
+//        return MapReduceLauncher.totalHadoopTimeSpent;
+        return 0L;
     }
   
     public Map<String, LogicalPlan> getAliases() {

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Apr  7 14:14:48 2008
@@ -33,28 +33,20 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobSubmissionProtocol;
 import org.apache.hadoop.mapred.JobTracker;
-
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
 import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
-import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.shock.SSHSocketImplFactory;
@@ -218,7 +210,8 @@
     public ExecPhysicalPlan compile(LogicalPlan[] plans,
                                     Properties properties)
             throws ExecException {
-        if (plans == null) {
+        // TODO FIX Need to uncomment this with the right logic
+        /*if (plans == null) {
             throw new ExecException("No Plans to compile");
         }
 
@@ -248,13 +241,14 @@
             }            
         }
         
-        return new MapRedPhysicalPlan(physicalKey, physicalOpTable);
+        return new MapRedPhysicalPlan(physicalKey, physicalOpTable);*/
+        throw new ExecException("Unsupported Operation");
     }
 
     public ExecJob execute(ExecPhysicalPlan plan) 
             throws ExecException {
-
-        POMapreduce pom = (POMapreduce) physicalOpTable.get(plan.getRoot());
+        // TODO FIX Need to uncomment this with the right logic
+        /*POMapreduce pom = (POMapreduce) physicalOpTable.get(plan.getRoot());
 
         MapReduceLauncher.initQueryStatus(pom.numMRJobs());  // initialize status, for bookkeeping purposes.
         MapReduceLauncher.setConf(this.conf.getConfiguration());
@@ -290,7 +284,8 @@
             throw new ExecException(e);
         }
         
-        return new HJob(JOB_STATUS.COMPLETED, pigContext, pom.outputFileSpec);
+        return new HJob(JOB_STATUS.COMPLETED, pigContext, pom.outputFileSpec);*/
+        throw new ExecException("Unsupported Operation");
 
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java Mon Apr  7 14:14:48 2008
@@ -39,7 +39,8 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.pig.Main;
+//TODO FIX Need to uncomment this with the right imports
+//import org.apache.pig.Main;
 import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.DataStorageException;
@@ -48,10 +49,11 @@
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
-import org.apache.pig.backend.local.executionengine.LocalExecutionEngine;
-import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+//TODO FIX Need to uncomment this with the right imports
+//import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
+//import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
+//import org.apache.pig.backend.local.executionengine.LocalExecutionEngine;
+//import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.WrappedIOException;
 
@@ -108,8 +110,9 @@
         this.execType = execType;
 
         initProperties();
-        
-        String pigJar = JarManager.findContainingJar(Main.class);
+        // TODO FIX Need to change this after Main starts working
+//        String pigJar = JarManager.findContainingJar(Main.class);
+        String pigJar = JarManager.findContainingJar(PigContext.class);
         String hadoopJar = JarManager.findContainingJar(FileSystem.class);
         if (pigJar != null) {
             skipJars.add(pigJar);
@@ -132,7 +135,9 @@
             
         try{        
             // first read the properties in the jar file
-            InputStream pis = MapReduceLauncher.class.getClassLoader().getResourceAsStream("properties");
+            // TODO FIX Need to uncomment this with the right class
+//            InputStream pis = MapReduceLauncher.class.getClassLoader().getResourceAsStream("properties");
+            InputStream pis = PigContext.class.getClassLoader().getResourceAsStream("properties");
             if (pis != null) {
                 fileProperties.load(pis);
             }
@@ -163,7 +168,8 @@
     public void connect() throws ExecException {
         try {
             switch (execType) {
-            case LOCAL:
+//          TODO FIX Need to uncomment this with the right logic
+            /*case LOCAL:
             {
                 lfs = new HDataStorage(URI.create("file:///"),
                                        new Configuration());
@@ -172,7 +178,7 @@
                 
                 executionEngine = new LocalExecutionEngine(this);
             }
-            break;
+            break;*/
 
             case MAPREDUCE:
             {
@@ -228,7 +234,8 @@
     public void addJar(URL resource) throws MalformedURLException{
         if (resource != null) {
             extraJars.add(resource);
-            LogicalPlanBuilder.classloader = createCl(null);
+//          TODO FIX Need to uncomment this with the right logic
+//            LogicalPlanBuilder.classloader = createCl(null);
         }
     }
 
@@ -344,7 +351,9 @@
         for (int i = 0; i < extraJars.size(); i++) {
             urls[i + passedJar] = extraJars.get(i);
         }
-        return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
+//      TODO FIX Need to uncomment this with the right logic
+//        return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
+        return new URLClassLoader(urls, PigContext.class.getClassLoader());
     }
     
     public static String getClassNameFromSpec(String funcSpec){
@@ -368,7 +377,9 @@
         for(String prefix: packageImportList) {
             Class c;
         try {
-            c = Class.forName(prefix+name,true, LogicalPlanBuilder.classloader);
+//          TODO FIX Need to uncomment this with the right logic
+//            c = Class.forName(prefix+name,true, LogicalPlanBuilder.classloader);
+            c = Class.forName(prefix+name,true, PigContext.class.getClassLoader());
             return c;
             } catch (ClassNotFoundException e) {
             } catch (LinkageError e) {}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Mon Apr  7 14:14:48 2008
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.impl.io;
 
-import java.lang.IllegalArgumentException;
 import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -26,27 +25,23 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Random;
-import java.util.Stack;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Random;
+import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.DataStorageException;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.WrappedIOException;
 
-import org.apache.pig.backend.datastorage.*;
-import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigInputFormat;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigInputFormat.PigRecordReader;
-
-import java.util.Properties;
-
 public class FileLocalizer {
     private static final Log log = LogFactory.getLog(FileLocalizer.class);
     
@@ -148,13 +143,14 @@
      */
     
     public static InputStream openDFSFile(String fileName) throws IOException{
-        PigRecordReader prr = PigInputFormat.PigRecordReader.getPigRecordReader();
+//      TODO FIX Need to uncomment this with the right logic
+        /*PigRecordReader prr = PigInputFormat.PigRecordReader.getPigRecordReader();
         
         if (prr == null)
             throw new RuntimeException("can't open DFS file while executing locally");
     
-        return openDFSFile(fileName, prr.getJobConf());
-        
+        return openDFSFile(fileName, prr.getJobConf());*/
+        throw new IOException("Unsupported Operation");
     }
 
     public static InputStream openDFSFile(String fileName, JobConf conf) throws IOException{
@@ -234,6 +230,29 @@
             }
             
             return new FileOutputStream(fileSpec,append);
+        }
+    }
+    
+    static public boolean delete(String fileSpec, PigContext pigContext) throws IOException{
+        fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
+        if (!fileSpec.startsWith(LOCAL_PREFIX)) {
+            init(pigContext);
+            ElementDescriptor elem = pigContext.getDfs().asElement(fileSpec);
+            elem.delete();
+            return true;
+        }
+        else {
+            fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
+            boolean ret = true;
+            // TODO probably this should be replaced with the local file system
+            File f = (new File(fileSpec));
+            // TODO this only deletes the file. Any dirs createdas a part of this
+            // are not removed.
+            if (f!=null){
+                ret = f.delete();
+            }
+            
+            return ret;
         }
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java Mon Apr  7 14:14:48 2008
@@ -22,10 +22,10 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
 import org.apache.pig.impl.plan.PlanVisitor;
@@ -55,14 +55,14 @@
         depthFirst();
     }
     
-//    public void visitLoad(POLoad ld){
-//        //do nothing
-//    }
-//    
-//    public void visitStore(POStore st){
-//        //do nothing
-//    }
-//    
+    public void visitLoad(POLoad ld){
+        //do nothing
+    }
+ 
+    public void visitStore(POStore st){
+        //do nothing
+    }
+    
     public void visitFilter(POFilter fl) throws ParseException{
         ExprPlanVisitor epv = new ExprPlanVisitor(fl.getPlan());
         epv.visit();

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java?rev=645697&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java Mon Apr  7 14:14:48 2008
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+
+/**
+ * The load operator which is used in two ways:
+ * 1) As a local operator it can be used to load files
+ * 2) In the Map Reduce setting, it is used to create jobs
+ *    from MapReduce operators which keep the loads and
+ *    stores in the Map and Reduce Plans till the job is created
+ *
+ */
+public class POLoad extends PhysicalOperator<PhyPlanVisitor> {
+    // The user defined load function or a default load function
+    LoadFunc loader = null;
+    // The filespec on which the operator is based
+    FileSpec lFile;
+    // The stream used to bind to by the loader
+    InputStream is;
+    // PigContext passed to us by the operator creator
+    PigContext pc;
+    //Indicates whether the loader setup is done or not
+    boolean setUpDone = false;
+    
+    private final Log log = LogFactory.getLog(getClass());
+    
+    public POLoad(OperatorKey k) {
+        this(k,-1, null);
+    }
+
+    
+    public POLoad(OperatorKey k, FileSpec lFile){
+        this(k,-1,lFile);
+    }
+    
+    public POLoad(OperatorKey k, int rp, FileSpec lFile) {
+        super(k, rp);
+    }
+    
+    /**
+     * Set up the loader by 
+     * 1) Instantiating the load func
+     * 2) Opening an input stream to the specified file and
+     * 3) Binding to the input stream
+     * @throws IOException
+     */
+    private void setUp() throws IOException{
+        String filename = lFile.getFileName();
+        loader = (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
+        
+        is = FileLocalizer.open(filename, pc);
+        
+        loader.bindTo(filename , new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+    }
+    
+    /**
+     * At the end of processing, the inputstream is closed
+     * using this method
+     * @throws IOException
+     */
+    private void tearDown() throws IOException{
+        is.close();
+    }
+    
+    /**
+     * The main method used by this operator's successor
+     * to read tuples from the specified file using the
+     * specified load function.
+     * 
+     * @return Whatever the loader returns
+     *          A null from the loader is indicative
+     *          of EOP and hence the tearDown of connection
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        if(!setUpDone && lFile!=null){
+            try {
+                setUp();
+            } catch (IOException e) {
+                ExecException ee = new ExecException("Unable to setup the loader because of the exception: " + e.getMessage());
+                ee.initCause(e);
+                throw ee;
+            }
+            setUpDone = true;
+        }
+        Result res = new Result();
+        try {
+            res.result = loader.getNext();
+            if(res.result==null){
+                res.returnStatus = POStatus.STATUS_EOP;
+                tearDown();
+            }
+            else
+                res.returnStatus = POStatus.STATUS_OK;
+        } catch (IOException e) {
+            log.error("Received error from loader function: " + e);
+            res.returnStatus = POStatus.STATUS_ERR; 
+            return res;
+        }
+        return res;
+    }
+
+    @Override
+    public String name() {
+        return "Load - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws ParseException {
+        v.visitLoad(this);
+    }
+
+
+    public FileSpec getLFile() {
+        return lFile;
+    }
+
+
+    public void setLFile(FileSpec file) {
+        lFile = file;
+    }
+
+
+    public PigContext getPc() {
+        return pc;
+    }
+
+
+    public void setPc(PigContext pc) {
+        this.pc = pc;
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java?rev=645697&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java Mon Apr  7 14:14:48 2008
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+
+/**
+ * The store operator which is used in two ways:
+ * 1) As a local operator it can be used to store files
+ * 2) In the Map Reduce setting, it is used to create jobs
+ *    from MapReduce operators which keep the loads and
+ *    stores in the Map and Reduce Plans till the job is created
+ *
+ */
+public class POStore extends PhysicalOperator<PhyPlanVisitor> {
+    // The user defined load function or a default load function
+    private StoreFunc storer;
+    // The filespec on which the operator is based
+    FileSpec sFile;
+    // The stream used to bind to by the loader
+    OutputStream os;
+    // PigContext passed to us by the operator creator
+    PigContext pc;
+    
+    public POStore(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    public POStore(OperatorKey k, int rp) {
+        this(k, rp, null);
+    }
+    
+    public POStore(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+    }
+    
+    /**
+     * Set up the storer by 
+     * 1) Instantiating the store func
+     * 2) Opening an output stream to the specified file and
+     * 3) Binding to the output stream
+     * @throws IOException
+     */
+    private void setUp() throws IOException{
+        storer = (StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+        os = FileLocalizer.create(sFile.getFileName(), pc);
+        storer.bindTo(os);
+    }
+    
+    /**
+     * At the end of processing, the outputstream is closed
+     * using this method
+     * @throws IOException
+     */
+    private void tearDown() throws IOException{
+        os.close();
+    }
+    
+    /**
+     * To perform cleanup when there is an error.
+     * Uses the FileLocalizer method which only 
+     * deletes the file but not the dirs created
+     * with it.
+     * @throws IOException
+     */
+    private void cleanUp() throws IOException{
+        String fName = sFile.getFileName();
+        os.flush();
+        if(FileLocalizer.fileExists(fName,pc))
+            FileLocalizer.delete(fName,pc);
+    }
+    
+    /**
+     * The main method used by the local execution engine
+     * to store tuples into the specified file using the
+     * specified store function. One call to this method
+     * retrieves all tuples from its predecessor operator
+     * and stores it into the file till it recieves an EOP.
+     * 
+     * If there is an error, the cleanUp routine is called
+     * and then the tearDown is called to close the OutputStream
+     * 
+     * @return Whatever the predecessor returns
+     *          A null from the predecessor is ignored
+     *          and processing of further tuples continued
+     */
+    public Result store() throws ExecException{
+        try{
+            setUp();
+            Result res;
+            Tuple inpValue = null;
+            while(true){
+                res = processInput();
+                if(res.returnStatus==POStatus.STATUS_OK)
+                    storer.putNext((Tuple)res.result);
+                else if(res.returnStatus==POStatus.STATUS_NULL)
+                    continue;
+                else
+                    break;
+            }
+            if(res.returnStatus==POStatus.STATUS_EOP){
+                storer.finish();
+            }
+            else{
+                cleanUp();
+            }
+            tearDown();
+            return res;
+        }catch(IOException e){
+            e.printStackTrace();
+            return new Result();
+        }
+    }
+
+    @Override
+    public String name() {
+        return "Store - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    public StoreFunc getStorer() {
+        return storer;
+    }
+
+    
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws ParseException {
+        v.visitStore(this);
+    }
+
+    public FileSpec getSFile() {
+        return sFile;
+    }
+
+    public void setSFile(FileSpec file) {
+        sFile = file;
+    }
+
+    public PigContext getPc() {
+        return pc;
+    }
+
+    public void setPc(PigContext pc) {
+        this.pc = pc;
+    }
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java Mon Apr  7 14:14:48 2008
@@ -38,7 +38,7 @@
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
+//import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
 import org.apache.pig.impl.PigContext;
 
 
@@ -97,9 +97,12 @@
     public static void createJar(OutputStream os, List<String> funcs, PigContext pigContext) throws ClassNotFoundException, IOException {
         Vector<JarListEntry> jarList = new Vector<JarListEntry>();
         for(String toSend: pigPackagesToSend) {
-            addContainingJar(jarList, PigMapReduce.class, toSend, pigContext);
+//          TODO FIX Need to uncomment this with the right logic
+//            addContainingJar(jarList, PigMapReduce.class, toSend, pigContext);
         }
-        ClassLoader pigClassLoader = PigMapReduce.class.getClassLoader();
+//      TODO FIX Need to uncomment this with the right logic
+//        ClassLoader pigClassLoader = PigMapReduce.class.getClassLoader();
+        ClassLoader pigClassLoader = PigContext.class.getClassLoader();
         
         for (String func: funcs) {
             Class clazz = pigContext.getClassForAlias(func);
@@ -148,7 +151,9 @@
         for (int i = 0; i < pigContext.extraJars.size(); i++) {
             urls[i + passedJar] = new URL("file:" + pigContext.extraJars.get(i));
         }
-        return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
+//      TODO FIX Need to uncomment this with the right logic
+//        return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
+        return new URLClassLoader(urls, PigContext.class.getClassLoader());
     }
     
 

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java Mon Apr  7 14:14:48 2008
@@ -37,6 +37,7 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -75,15 +76,6 @@
     public void tearDown() throws Exception {
     }
 
-    private boolean bagContains(DataBag db, Tuple t) {
-        Iterator<Tuple> iter = db.iterator();
-        for (Tuple tuple : db) {
-            if (tuple.compareTo(t) == 0)
-                return true;
-        }
-        return false;
-    }
-
     @Test
     public void testGetNextTuple() throws ExecException, IOException {
         pass.attachInput(t);
@@ -99,7 +91,7 @@
                 break;
             assertEquals(POStatus.STATUS_OK, res.returnStatus);
             Tuple output = (Tuple) res.result;
-            assertEquals(true, bagContains(inp, output));
+            assertEquals(true, TestHelper.bagContains(inp, output));
             assertEquals(true, (Integer) ((Tuple) res.result).get(1) > 50);
         }
     }

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java?rev=645697&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java Mon Apr  7 14:14:48 2008
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+
+import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLoad {
+    FileSpec inpFSpec;
+    POLoad ld;
+    PigContext pc;
+    DataBag inpDB;
+    
+    static MiniCluster cluster = MiniCluster.buildCluster();
+    @Before
+    public void setUp() throws Exception {
+        
+        inpFSpec = new FileSpec("file:////etc/passwd",PigStorage.class.getName()+"(':')");
+        pc = new PigContext();
+        pc.connect();
+        
+        ld = GenPhyOp.topLoadOp();
+        ld.setLFile(inpFSpec);
+        ld.setPc(pc);
+        
+        inpDB = DefaultBagFactory.getInstance().newDefaultBag();
+        BufferedReader br = new BufferedReader(new FileReader("/etc/passwd"));
+        
+        for(String line = br.readLine();line!=null;line=br.readLine()){
+            String[] flds = line.split(":",-1);
+            Tuple t = new DefaultTuple();
+            for (String fld : flds) {
+                t.append((fld.compareTo("")!=0 ? new DataByteArray(fld.getBytes()) : null));
+            }
+            inpDB.add(t);
+        }
+    }
+    
+    
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public void testGetNextTuple() throws ExecException {
+        Tuple t=null;
+        int size = 0;
+        for(Result res = ld.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(t)){
+            assertEquals(true, TestHelper.bagContains(inpDB, (Tuple)res.result));
+            ++size;
+        }
+        assertEquals(true, size==inpDB.size());
+    }
+
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java Mon Apr  7 14:14:48 2008
@@ -32,6 +32,7 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -70,15 +71,6 @@
         }
     }
 
-    private boolean bagContains(DataBag db, Tuple t) {
-        Iterator<Tuple> iter = db.iterator();
-        for (Tuple tuple : db) {
-            if (tuple.compareTo(t) == 0)
-                return true;
-        }
-        return false;
-    }
-
     @Test
     public void testGetNextTuple() throws IOException, ExecException {
         proj.attachInput(t);
@@ -91,7 +83,7 @@
             res = proj.getNext(t);
             if (res.returnStatus == POStatus.STATUS_EOP)
                 break;
-            if (!bagContains(inpBag, (Tuple) res.result)) {
+            if (!TestHelper.bagContains(inpBag, (Tuple) res.result)) {
                 contains = false;
                 break;
             }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java Mon Apr  7 14:14:48 2008
@@ -17,100 +17,88 @@
  */
 package org.apache.pig.test;
 
-import java.io.File;
-import java.io.PrintWriter;
-import java.util.Iterator;
+import static org.junit.Assert.assertEquals;
 
-import org.apache.pig.PigServer;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStore {
+    POStore st;
+    FileSpec fSpec;
+    DataBag inpDB;
+    static MiniCluster cluster = MiniCluster.buildCluster();
+    PigContext pc;
+    
+    @Before
+    public void setUp() throws Exception {
+        st = GenPhyOp.topStoreOp();
+        fSpec = new FileSpec("file:////tmp/storeTest.txt",PigStorage.class.getName()+"(':')");
+        st.setSFile(fSpec);
+        pc = new PigContext();
+        pc.connect();
+        st.setPc(pc);
+        
+        POProject proj = GenPhyOp.exprProject();
+        proj.setColumn(0);
+        proj.setResultType(DataType.TUPLE);
+        proj.setOverloaded(true);
+        List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
+        inps.add(proj);
+        st.setInputs(inps);
+        
+        inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
+        Tuple t = new DefaultTuple();
+        t.append(inpDB);
+        proj.attachInput(t);
+    }
 
-import junit.framework.TestCase;
-
-public class TestStore extends TestCase {
-
-    private String initString = "mapreduce";
-    MiniCluster cluster = MiniCluster.buildCluster();
-    private int LOOP_COUNT = 1024;
-    
-    String fileName;
-    String tmpFile1, tmpFile2;
-    PigServer pig;
-    
-    public void testSingleStore() throws Exception{
-        pig.registerQuery("A = load " + fileName + ";");
-        
-        pig.store("A", tmpFile1);
-        
-        pig.registerQuery("B = load " + tmpFile1 + ";");
-        Iterator<Tuple> iter  = pig.openIterator("B");
-        
-        int i =0;
-        while (iter.hasNext()){
-            Tuple t = iter.next();
-            assertEquals(DataType.toInteger(t.get(0)).intValue(),i);
-            assertEquals(DataType.toInteger(t.get(1)).intValue(),i);
-            i++;
-        }
+    @After
+    public void tearDown() throws Exception {
     }
-    
-    public void testMultipleStore() throws Exception{
-        pig.registerQuery("A = load " + fileName + ";");
-        
-        pig.store("A", tmpFile1);
-        
-        pig.registerQuery("B = foreach (group A by $0) generate $0, SUM($1);");
-        pig.store("B", tmpFile2);
-        pig.registerQuery("C = load " + tmpFile2 + ";");
-        Iterator<Tuple> iter  = pig.openIterator("C");
-        
-        int i =0;
-        while (iter.hasNext()){
-            Tuple t = iter.next();
-            i++;
+
+    @Test
+    public void testStore() throws ExecException, IOException {
+        Result res = st.store();
+        assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+        
+        int size = 0;
+        BufferedReader br = new BufferedReader(new FileReader("/tmp/storeTest.txt"));
+        for(String line=br.readLine();line!=null;line=br.readLine()){
+            String[] flds = line.split(":",-1);
+            Tuple t = new DefaultTuple();
+            t.append(flds[0].compareTo("")!=0 ? flds[0] : null);
+            t.append(Integer.parseInt(flds[1]));
             
+            assertEquals(true, TestHelper.bagContains(inpDB, t));
+            ++size;
         }
-        
-        assertEquals(LOOP_COUNT, i);
-        
-    }
-    
-    public void testStoreWithMultipleMRJobs() throws Exception{
-        pig.registerQuery("A = load " + fileName + ";");        
-        pig.registerQuery("B = foreach (group A by $0) generate $0, SUM($1);");
-        pig.registerQuery("C = foreach (group B by $0) generate $0, SUM($1);");
-        pig.registerQuery("D = foreach (group C by $0) generate $0, SUM($1);");
-
-        pig.store("D", tmpFile2);
-        pig.registerQuery("E = load " + tmpFile2 + ";");
-        Iterator<Tuple> iter  = pig.openIterator("E");
-        
-        int i =0;
-        while (iter.hasNext()){
-            Tuple t = iter.next();
-            i++;
-        }
-        
-        assertEquals(LOOP_COUNT, i);
-        
+        assertEquals(true, size==inpDB.size());
+        FileLocalizer.delete(fSpec.getFileName(), pc);
     }
 
-    
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        File f = File.createTempFile("tmp", "");
-        PrintWriter pw = new PrintWriter(f);
-        for (int i=0;i<LOOP_COUNT; i++){
-            pw.println(i + "\t" + i);
-        }
-        pw.close();
-        pig = new PigServer(initString);
-        fileName = "'" + FileLocalizer.hadoopify(f.toString(), pig.getPigContext()) + "'";
-        tmpFile1 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
-        tmpFile2 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
-        f.delete();
-    }
-    
 }

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java?rev=645697&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java Mon Apr  7 14:14:48 2008
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+
+import junit.framework.TestCase;
+
+public class TestStoreOld extends TestCase {
+
+    private String initString = "mapreduce";
+    MiniCluster cluster = MiniCluster.buildCluster();
+    private int LOOP_COUNT = 1024;
+    
+    String fileName;
+    String tmpFile1, tmpFile2;
+    PigServer pig;
+    
+    public void testSingleStore() throws Exception{
+        pig.registerQuery("A = load " + fileName + ";");
+        
+        pig.store("A", tmpFile1);
+        
+        pig.registerQuery("B = load " + tmpFile1 + ";");
+        Iterator<Tuple> iter  = pig.openIterator("B");
+        
+        int i =0;
+        while (iter.hasNext()){
+            Tuple t = iter.next();
+            assertEquals(DataType.toInteger(t.get(0)).intValue(),i);
+            assertEquals(DataType.toInteger(t.get(1)).intValue(),i);
+            i++;
+        }
+    }
+    
+    public void testMultipleStore() throws Exception{
+        pig.registerQuery("A = load " + fileName + ";");
+        
+        pig.store("A", tmpFile1);
+        
+        pig.registerQuery("B = foreach (group A by $0) generate $0, SUM($1);");
+        pig.store("B", tmpFile2);
+        pig.registerQuery("C = load " + tmpFile2 + ";");
+        Iterator<Tuple> iter  = pig.openIterator("C");
+        
+        int i =0;
+        while (iter.hasNext()){
+            Tuple t = iter.next();
+            i++;
+            
+        }
+        
+        assertEquals(LOOP_COUNT, i);
+        
+    }
+    
+    public void testStoreWithMultipleMRJobs() throws Exception{
+        pig.registerQuery("A = load " + fileName + ";");        
+        pig.registerQuery("B = foreach (group A by $0) generate $0, SUM($1);");
+        pig.registerQuery("C = foreach (group B by $0) generate $0, SUM($1);");
+        pig.registerQuery("D = foreach (group C by $0) generate $0, SUM($1);");
+
+        pig.store("D", tmpFile2);
+        pig.registerQuery("E = load " + tmpFile2 + ";");
+        Iterator<Tuple> iter  = pig.openIterator("E");
+        
+        int i =0;
+        while (iter.hasNext()){
+            Tuple t = iter.next();
+            i++;
+        }
+        
+        assertEquals(LOOP_COUNT, i);
+        
+    }
+
+    
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        File f = File.createTempFile("tmp", "");
+        PrintWriter pw = new PrintWriter(f);
+        for (int i=0;i<LOOP_COUNT; i++){
+            pw.println(i + "\t" + i);
+        }
+        pw.close();
+        pig = new PigServer(initString);
+        fileName = "'" + FileLocalizer.hadoopify(f.toString(), pig.getPigContext()) + "'";
+        tmpFile1 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
+        tmpFile2 = "'" + FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString() + "'";
+        f.delete();
+    }
+    
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=645697&r1=645696&r2=645697&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Mon Apr  7 14:14:48 2008
@@ -26,10 +26,10 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
 //import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
@@ -94,10 +94,10 @@
 //        return ret;
 //    }
 //    
-//    public static POLoad topLoadOp(){
-//        POLoad ret = new POLoad(new OperatorKey("",r.nextLong()));
-//        return ret;
-//    }
+   public static POLoad topLoadOp(){
+       POLoad ret = new POLoad(new OperatorKey("",r.nextLong()));
+       return ret;
+   }
     
     public static POFilter topFilterOp(){
         POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
@@ -132,14 +132,14 @@
     }
     
     public static POFilter topFilterOpWithProj(int col, int rhsVal) throws IOException{
-    	POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
-    	
-    	POProject proj = exprProject();
-    	proj.setResultType(DataType.INTEGER);
-    	proj.setColumn(col);
-    	proj.setOverloaded(false);
-    	
-    	ConstantExpression ce2 = GenPhyOp.exprConst();
+        POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
+        
+        POProject proj = exprProject();
+        proj.setResultType(DataType.INTEGER);
+        proj.setColumn(col);
+        proj.setOverloaded(false);
+        
+        ConstantExpression ce2 = GenPhyOp.exprConst();
         ce2.setValue(rhsVal);
         
         GreaterThanExpr gr = GenPhyOp.compGreaterThanExpr();
@@ -170,10 +170,10 @@
 //        return ret;
 //    }
 //    
-//    public static POStore topStoreOp(){
-//        POStore ret = new POStore(new OperatorKey("",r.nextLong()));
-//        return ret;
-//    }
+   public static POStore topStoreOp(){
+       POStore ret = new POStore(new OperatorKey("",r.nextLong()));
+       return ret;
+   }
 //    
 //    public static StartMap topStartMapOp(){
 //        StartMap ret = new StartMap(new OperatorKey("",r.nextLong()));

Added: incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java?rev=645697&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java Mon Apr  7 14:14:48 2008
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.util.Iterator;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Will contain static methods that will be useful
+ * for unit tests
+ *
+ */
+public class TestHelper {
+    public static boolean bagContains(DataBag db, Tuple t) {
+        Iterator<Tuple> iter = db.iterator();
+        for (Tuple tuple : db) {
+            if (tuple.compareTo(t) == 0)
+                return true;
+        }
+        return false;
+    }
+}