You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/27 16:51:51 UTC

svn commit: r990165 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relational...

Author: thejas
Date: Fri Aug 27 14:51:50 2010
New Revision: 990165

URL: http://svn.apache.org/viewvc?rev=990165&view=rev
Log:
PIG-506 - patch PIG-506.3.patch with change suggested by Daniel in jira
changes to work with new logical plan and other fixes

Added:
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LONative.java
Modified:
    hadoop/pig/trunk/build.xml
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LONative.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java

Modified: hadoop/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/build.xml (original)
+++ hadoop/pig/trunk/build.xml Fri Aug 27 14:51:50 2010
@@ -626,7 +626,6 @@
                     <exclude name="**/TestOrderBy.java" />
                     <exclude name="**/TestOrderBy2.java" />
                     <exclude name="**/TestPi.java" />
-                    <exclude name="**/TestNativeMapReduce.java" />
                     <exclude name="**/nightly/**" />
                     <!-- <exclude name="**/pigunit/**" /> -->
                     <exclude name="**/${exclude.testcase}.java" if="exclude.testcase" />

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Aug 27 14:51:50 2010
@@ -68,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
@@ -149,9 +150,6 @@ public class MRCompiler extends PhyPlanV
     //The output of compiling the inputs
     MapReduceOper[] compiledInputs = null;
 
-    //Mapping of which MapReduceOper a store belongs to.
-    Map<POStore, MapReduceOper> storeToMapReduceMap;
-    
     //The split operators seen till now. If not
     //maintained they will haunt you.
     //During the traversal a split is the only
@@ -199,7 +197,6 @@ public class MRCompiler extends PhyPlanV
         }
         scope = roots.get(0).getOperatorKey().getScope();
         messageCollector = new CompilationMessageCollector() ;
-        storeToMapReduceMap = new HashMap<POStore, MapReduceOper>();
         phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
     }
     
@@ -261,9 +258,17 @@ public class MRCompiler extends PhyPlanV
             }
         }
 
+        // get all stores and nativeMR operators, sort them in order(operator id)
+        // and compile their plans
         List<POStore> stores = PlanHelper.getStores(plan);
-        for (POStore store: stores) {
-            compile(store);
+        List<PONative> nativeMRs= PlanHelper.getNativeMRs(plan);
+        List<PhysicalOperator> ops = new ArrayList<PhysicalOperator>(stores.size() + nativeMRs.size());
+        ops.addAll(stores);
+        ops.addAll(nativeMRs);
+        Collections.sort(ops);
+        
+        for (PhysicalOperator op : ops) {
+            compile(op);
         }
         
         // I'm quite certain this is not the best way to do this.  The issue
@@ -313,7 +318,11 @@ public class MRCompiler extends PhyPlanV
         //store them away so that we can use them for compiling
         //op.
         List<PhysicalOperator> predecessors = plan.getPredecessors(op);
-        if (predecessors != null && predecessors.size() > 0) {
+        if(op instanceof PONative){
+            // the predecessor (store) has already been processed
+            // don't process it again
+        }
+        else if (predecessors != null && predecessors.size() > 0) {
             // When processing an entire script (multiquery), we can
             // get into a situation where a load has
             // predecessors. This means that it depends on some store
@@ -331,9 +340,12 @@ public class MRCompiler extends PhyPlanV
                 }
 
                 PhysicalOperator p = predecessors.get(0);
-                if (!(p instanceof POStore)) {
+                MapReduceOper oper = null;
+                if(p instanceof POStore || p instanceof PONative){
+                    oper = phyToMROpMap.get(p); 
+                }else{
                     int errCode = 2126;
-                    String msg = "Predecessor of load should be a store. Got "+p.getClass();
+                    String msg = "Predecessor of load should be a store or mapreduce operator. Got "+p.getClass();
                     throw new PlanException(msg, errCode, PigException.BUG);
                 }
 
@@ -341,8 +353,6 @@ public class MRCompiler extends PhyPlanV
                 curMROp = getMROp();
                 curMROp.mapPlan.add(op);
                 MRPlan.add(curMROp);
-                
-                MapReduceOper oper = storeToMapReduceMap.get((POStore)p);
 
                 plan.disconnect(op, p);
                 MRPlan.connect(oper, curMROp);
@@ -760,39 +770,13 @@ public class MRCompiler extends PhyPlanV
     @Override
     public void visitNative(PONative op) throws VisitorException{
         // We will explode the native operator here to add a new MROper for native Mapreduce job
-        // We will also add respective Load and Store operators for this native job
         try{
-            POStore st = op.getInnerStore();
-            st.setAlias(op.getAlias());
-            st.setIsTmpStore(true);
-            st.setParentPlan(mPlan);
-            mPlan.add(st);
-            
-            PhysicalOperator pred = mPlan.getPredecessors(op).get(0);
-            mPlan.disconnect(pred, op);
-            mPlan.connect(pred, st);
-            
-            nonBlocking(st);
-            
-            MapReduceOper prevMROp = phyToMROpMap.get(pred);
-            storeToMapReduceMap.put(st, prevMROp);
-            phyToMROpMap.put(st, prevMROp);
-            if (st.getSFile()!=null && st.getSFile().getFuncSpec()!=null)
-                curMROp.UDFs.add(st.getSFile().getFuncSpec().toString());
-            
             // add a map reduce boundary
-            MapReduceOper nativeMR = getNativeMROp(op.getNativeMRjar(), op.getParams());
-            MRPlan.add(nativeMR);
-            MRPlan.connect(curMROp, nativeMR);
-            
-            // add one more map reduce boundary
-            POLoad ld = op.getInnerLoad();
-            ld.setAlias(op.getAlias());
-            curMROp = getMROp();
-            curMROp.mapPlan.add(ld);
-            MRPlan.add(curMROp);
-            MRPlan.connect(nativeMR, curMROp);
-            
+            MapReduceOper nativeMROper = getNativeMROp(op.getNativeMRjar(), op.getParams());
+            MRPlan.add(nativeMROper);
+            MRPlan.connect(curMROp, nativeMROper);
+            phyToMROpMap.put(op, nativeMROper);
+            curMROp = nativeMROper;
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + op.getClass().getSimpleName();
@@ -803,7 +787,6 @@ public class MRCompiler extends PhyPlanV
     @Override
     public void visitStore(POStore op) throws VisitorException{
         try{
-            storeToMapReduceMap.put(op, curMROp);
             nonBlocking(op);
             phyToMROpMap.put(op, curMROp);
             if (op.getSFile()!=null && op.getSFile().getFuncSpec()!=null)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Aug 27 14:51:50 2010
@@ -126,6 +126,7 @@ public class MapReduceLauncher extends L
         PigStatsUtil.startCollection(pc, jobClient, jcc, mrp); 
         
         List<Job> failedJobs = new LinkedList<Job>();
+        List<NativeMapReduceOper> failedNativeMR = new LinkedList<NativeMapReduceOper>();
         List<Job> completeFailedJobsInThisRun = new LinkedList<Job>();
         List<Job> succJobs = new LinkedList<Job>();
         JobControl jc;
@@ -136,7 +137,10 @@ public class MapReduceLauncher extends L
         //create the exception handler for the job control thread
         //and register the handler with the job control thread
         JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
-
+        
+        boolean stop_on_failure = 
+            pc.getProperties().getProperty("stop.on.failure", "false").equals("true");
+        
         // jc is null only when mrp.size == 0
         while(mrp.size() != 0) {
             jc = jcc.compile(mrp, grpName);
@@ -152,14 +156,34 @@ public class MapReduceLauncher extends L
                             ScriptState.get().emitJobsSubmittedNotification(1);
                             natOp.runJob();
                             numMRJobsCompl++;
-                            double prog = ((double)numMRJobsCompl)/totalMRJobs;
-                            notifyProgress(prog, lastProg);
-                            lastProg = prog;
-                            mrp.remove(natOp);
-                        } catch (JobCreationException je) {
+                        } catch (IOException e) {
+                            
                             mrp.trimBelow(natOp);
-                            mrp.remove(natOp);
+                            failedNativeMR.add(natOp);
+                            
+                            String msg = "Error running native mapreduce" +
+                            " operator job :" + natOp.getJobId() + e.getMessage();
+                            
+                            String stackTrace = getStackStraceStr(e);
+                            LogUtils.writeLog(msg,
+                                    stackTrace,
+                                    pc.getProperties().getProperty("pig.logfile"),
+                                    log
+                            );     
+                            log.info(msg);
+                            
+                            if (stop_on_failure) {
+                                int errCode = 6017;
+                               
+                                throw new ExecException(msg, errCode,
+                                        PigException.REMOTE_ENVIRONMENT);
+                            }
+                            
                         }
+                        double prog = ((double)numMRJobsCompl)/totalMRJobs;
+                        notifyProgress(prog, lastProg);
+                        lastProg = prog;
+                        mrp.remove(natOp);
                     }
                 }
                 continue;
@@ -250,8 +274,7 @@ public class MapReduceLauncher extends L
             }
             
             if (!jc.getFailedJobs().isEmpty() ) {
-                if ("true".equalsIgnoreCase(pc.getProperties().getProperty(
-                        "stop.on.failure", "false"))) {
+                if (stop_on_failure){
                     int errCode = 6017;
                     StringBuilder msg = new StringBuilder();
                     
@@ -299,6 +322,10 @@ public class MapReduceLauncher extends L
              
         boolean failed = false;
         
+        if(failedNativeMR.size() > 0){
+            failed = true;
+        }
+        
         // Look to see if any jobs failed.  If so, we need to report that.
         if (failedJobs != null && failedJobs.size() > 0) {
             
@@ -382,6 +409,13 @@ public class MapReduceLauncher extends L
         return PigStatsUtil.getPigStats(ret);
     }
 
+    private String getStackStraceStr(Throwable e) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        e.printStackTrace(ps);
+        return baos.toString();
+    }
+
     /**
      * Log the progress and notify listeners if there is sufficient progress 
      * @param prog current progress
@@ -553,10 +587,7 @@ public class MapReduceLauncher extends L
     class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
         
         public void uncaughtException(Thread thread, Throwable throwable) {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            PrintStream ps = new PrintStream(baos);
-            throwable.printStackTrace(ps);
-            jobControlExceptionStackTrace = baos.toString();    		
+            jobControlExceptionStackTrace = getStackStraceStr(throwable);
             try {	
                 jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
             } catch (Exception e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Fri Aug 27 14:51:50 2010
@@ -1669,17 +1669,7 @@ public class LogToPhyTranslationVisitor 
         poNative.setAlias(loNative.getAlias());
         poNative.setNativeMRjar(loNative.getNativeMRJar());
         poNative.setParams(loNative.getParams());
-        poNative.setResultType(loNative.getLoad().getType());
-        
-        currentPlans.push(currentPlan);
-        currentPlan = new PhysicalPlan();
-        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
-                .spawnChildWalker(loNative.getInnerPlan());
-        pushWalker(childWalker);
-        mCurrentWalker.walk(this);
-        popWalker();
-        poNative.setPhysInnerPlan(currentPlan);
-        currentPlan = currentPlans.pop();
+        poNative.setResultType(DataType.BAG);
         
         logToPhyMap.put(loNative, poNative);
         currentPlan.add(poNative);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java Fri Aug 27 14:51:50 2010
@@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Utils;
@@ -28,40 +27,13 @@ public class PONative extends PhysicalOp
     
     private static final long serialVersionUID = 1L;
 
-    PhysicalPlan physInnerPlan;
     String nativeMRjar;
     String[] params;
-    POStore innerStore;
-    POLoad innerLoad;
-    
+
     public PONative(OperatorKey k) {
         super(k);
     }
 
-    public PhysicalPlan getPhysInnerPlan() {
-        return physInnerPlan;
-    }
-
-    public void setPhysInnerPlan(PhysicalPlan physInnerPlan) {
-        this.physInnerPlan = physInnerPlan;
-        for(PhysicalOperator innerOp : physInnerPlan) {
-            if(innerOp instanceof POStore) {
-                innerStore = (POStore) innerOp;
-            }
-            if(innerOp instanceof POLoad) {
-                innerLoad = (POLoad) innerOp;
-            }
-        }
-    }
-    
-    public POStore getInnerStore() {
-        return innerStore;
-    }
-
-    public POLoad getInnerLoad() {
-        return innerLoad;
-    }
-
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitNative(this);
@@ -70,7 +42,8 @@ public class PONative extends PhysicalOp
     @Override
     public String name() {
         return getAliasString() + "Native" + "('hadoop jar "
-        + nativeMRjar + " " + Utils.getStringFromArray(params) + "')" + " - " + mKey.toString();
+        + nativeMRjar + " " + Utils.getStringFromArray(params) + "')" 
+        + " - " + mKey.toString();
     }
 
     public String getNativeMRjar() {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Fri Aug 27 14:51:50 2010
@@ -50,7 +50,7 @@ public class PlanHelper {
      * @return List of stores (could be empty)
      */
     public static LinkedList<POStore> getStores(PhysicalPlan plan) throws VisitorException {
-        LoadStoreFinder finder = new LoadStoreFinder(plan);
+        LoadStoreNativeFinder finder = new LoadStoreNativeFinder(plan);
 
         finder.visit();
         return finder.getStores();
@@ -62,11 +62,23 @@ public class PlanHelper {
      * @return List of loads (could be empty)
      */
     public static LinkedList<POLoad> getLoads(PhysicalPlan plan) throws VisitorException {
-        LoadStoreFinder finder = new LoadStoreFinder(plan);
+        LoadStoreNativeFinder finder = new LoadStoreNativeFinder(plan);
 
         finder.visit();
         return finder.getLoads();
     }
+    
+    /**
+     * Get all the load operators in the plan in the right dependency order
+     * @param plan
+     * @return List of loads (could be empty)
+     */
+    public static LinkedList<PONative> getNativeMRs(PhysicalPlan plan) throws VisitorException {
+        LoadStoreNativeFinder finder = new LoadStoreNativeFinder(plan);
+
+        finder.visit();
+        return finder.getNativeMRs();
+    }    
 
     /**
      * Creates a relative path that can be used to build a temporary
@@ -85,14 +97,16 @@ public class PlanHelper {
         }
     }
 
-    private static class LoadStoreFinder extends PhyPlanVisitor {
+    private static class LoadStoreNativeFinder extends PhyPlanVisitor {
         private LinkedList<POLoad> loads;
         private LinkedList<POStore> stores;
+        private LinkedList<PONative> nativeMRs;
         
-        LoadStoreFinder(PhysicalPlan plan) {
+        LoadStoreNativeFinder(PhysicalPlan plan) {
             super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
             stores = new LinkedList<POStore>();
             loads = new LinkedList<POLoad>();
+            nativeMRs = new LinkedList<PONative>();
         }
         
         @Override
@@ -111,7 +125,13 @@ public class PlanHelper {
             super.visitLoad(load);
             loads.add(load);
         }
-
+        
+        @Override 
+        public void visitNative(PONative nativeMR) throws VisitorException {
+            super.visitNative(nativeMR);
+            nativeMRs.add(nativeMR);
+        }
+        
         public LinkedList<POStore> getStores() {
             return stores;
         }
@@ -119,5 +139,9 @@ public class PlanHelper {
         public LinkedList<POLoad> getLoads() {
             return loads;
         }
+        
+        public LinkedList<PONative> getNativeMRs(){
+            return nativeMRs;
+        }
     }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LONative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LONative.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LONative.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LONative.java Fri Aug 27 14:51:50 2010
@@ -31,34 +31,16 @@ public class LONative extends Relational
      */
     private static final long serialVersionUID = 1L;
 
-    String nativeMRJar;
-    LogicalPlan innerPlan;
-    String[] params = null;
-    private LOLoad load;
-    private LOStore store;
+    private String nativeMRJar;
+    private String[] params = null;
 
-    public LONative(LogicalPlan plan, OperatorKey k, LogicalPlan innerLP, 
-                    LOStore loStore, LOLoad loLoad, String nativeJar, String[] parameters) {
+    public LONative(LogicalPlan plan, OperatorKey k, 
+            String nativeJar, String[] parameters) {
         super(plan, k);
-        innerPlan = innerLP;
         nativeMRJar = nativeJar;
         params = parameters;
-        load = loLoad;
-        store = loStore;
     }
-    
-    public LogicalPlan getInnerPlan() {
-        return innerPlan;
-    }
-
-    public LOLoad getLoad() {
-        return load;
-    }
-
-    public LOStore getStore() {
-        return store;
-    }
-    
+ 
     @Override
     public List<RequiredFields> getRelevantInputs(int output, int column)
             throws FrontendException {
@@ -74,18 +56,14 @@ public class LONative extends Relational
     }
 
     @Override
-    public Schema getSchema() throws FrontendException {
-        return load.getSchema();
-    }
-
-    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }
 
     @Override
     public String name() {
-        return getAliasString() + "Native " + mKey.scope + "-" + mKey.id +" Store: " + store.name() + " Run: hadoop jar " + nativeMRJar + " " + Utils.getStringFromArray(params) + " Load: " + load.name();
+        return getAliasString() + "Native " + mKey.scope + "-" + mKey.id 
+        + " Run: hadoop jar " + nativeMRJar + " " + Utils.getStringFromArray(params) ;
      }
     
     @Override
@@ -93,4 +71,9 @@ public class LONative extends Relational
         return false;
     }
 
+    @Override
+    public Schema getSchema() throws FrontendException {
+        return null;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Fri Aug 27 14:51:50 2010
@@ -108,17 +108,6 @@ public class LogicalOptimizer extends
         mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
                 new PartitionFilterOptimizer(plan), "LoadPartitionFilterOptimizer"));
         
-        // this one is ordered to be before other optimizations since  later 
-        // optimizations may move the LOFilter that is looks for just after a 
-        // LOLoad
-        rulePlan = new RulePlan();
-        RuleOperator loNat = new RuleOperator(LONative.class, 
-                new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
-        rulePlan.add(loNat);
-        mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
-                new PartitionFilterOptimizer(plan), "NativePartitionFilterOptimizer"));
-
-
         // Add type casting to plans where the schema has been declared (by
         // user, data, or data catalog).
         rulePlan = new RulePlan();
@@ -138,16 +127,6 @@ public class LogicalOptimizer extends
         mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan, new TypeCastInserter(plan,
                 LOStream.class.getName()), "StreamTypeCastInserter"));
         
-        // Add type casting to plans where the schema has been declared by
-        // user in a statement with native operator.
-        
-        rulePlan = new RulePlan();
-        RuleOperator loNative= new RuleOperator(LONative.class, 
-                new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
-        rulePlan.add(loNative);
-        mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan, new TypeCastInserter(plan,
-                LONative.class.getName()), "NativeTypeCastInserter"));
-
         if(!turnAllRulesOff) {
 
             // Push up limit wherever possible.

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java Fri Aug 27 14:51:50 2010
@@ -35,7 +35,6 @@ import org.apache.pig.Expression.Column;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOFilter;
 import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LONative;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.PColFilterExtractor;
@@ -105,7 +104,7 @@ public class PartitionFilterOptimizer ex
             		"or empty list.";
             throw new OptimizerException(msg, errCode, PigException.BUG);
         }
-        if(nodes.size() != 1|| !(nodes.get(0) instanceof LOLoad || nodes.get(0) instanceof LONative)) {
+        if(nodes.size() != 1|| !(nodes.get(0) instanceof LOLoad )) {
             return false;
         }
         if (!alreadyChecked.add(nodes.get(0))) {
@@ -113,9 +112,7 @@ public class PartitionFilterOptimizer ex
         }
         if(nodes.get(0) instanceof LOLoad) {
             loLoad = (LOLoad)nodes.get(0);
-        } else {
-            loLoad = ((LONative)nodes.get(0)).getLoad();
-        }
+        } 
         List<LogicalOperator> sucs = mPlan.getSuccessors(loLoad);
         if(sucs == null || sucs.size() != 1 || !(sucs.get(0) instanceof LOFilter)) {
             return false;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java Fri Aug 27 14:51:50 2010
@@ -183,13 +183,7 @@ public class SchemaRemover extends LOVis
         // don't have a way to recover it.
         super.visit(load);
     }
-    
-    @Override
-    protected void visit(LONative load) throws VisitorException{
-        // We treat this in similar way as load
-        super.visit(load);
-    }
-    
+       
     @Override
     protected void visit(LOStore store) throws VisitorException{
         store.unsetSchema();

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Fri Aug 27 14:51:50 2010
@@ -30,7 +30,6 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.LOCast;
 import org.apache.pig.impl.logicalLayer.LOForEach;
 import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LONative;
 import org.apache.pig.impl.logicalLayer.LOProject;
 import org.apache.pig.impl.logicalLayer.LOStream;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
@@ -73,9 +72,6 @@ public class TypeCastInserter extends Lo
             if(LOLoad.class.getName().equals(operatorClassName)) {
                 determinedSchema = ((LOLoad)op).getDeterminedSchema();
             }
-            if(LONative.class.getName().equals(operatorClassName)) {
-                determinedSchema = ((LONative)op).getLoad().getDeterminedSchema();
-            }
             for (int i = 0; i < fss.size(); i++) {
                 if (fss.get(i).type != DataType.BYTEARRAY) {
                     if(determinedSchema == null || 
@@ -128,16 +124,6 @@ public class TypeCastInserter extends Lo
             }
     
             return lo;
-        } else if(LONative.class.getName().equals(operatorClassName)) {
-            if(lo == null || !(lo instanceof LONative)) {
-                int errCode = 2005;
-                String msg = "Expected " + LONative.class.getSimpleName()
-                        + ", got "
-                        + (lo == null ? lo : lo.getClass().getSimpleName());
-                throw new OptimizerException(msg, errCode, PigException.BUG);
-            }
-            
-            return lo;
         } else {
             // we should never be called with any other operator class name
             int errCode = 1034;
@@ -167,9 +153,6 @@ public class TypeCastInserter extends Lo
             if(LOLoad.class.getName().equals(operatorClassName)) {
                 determinedSchema = ((LOLoad)lo).getDeterminedSchema();
             }
-            if(LONative.class.getName().equals(operatorClassName)) {
-                determinedSchema = ((LONative)lo).getLoad().getDeterminedSchema();
-            }
             for (int i = 0; i < s.size(); i++) {
                 LogicalPlan p = new LogicalPlan();
                 genPlans.add(p);
@@ -199,8 +182,6 @@ public class TypeCastInserter extends Lo
                                 StreamingCommand command = ((LOStream)lo).getStreamingCommand();
                                 HandleSpec streamOutputSpec = command.getOutputSpec(); 
                                 loadFuncSpec = new FuncSpec(streamOutputSpec.getSpec());
-                            } else if (lo instanceof LONative) {
-                                loadFuncSpec = ((LONative)lo).getLoad().getInputFile().getFuncSpec();
                             } else {
                                 int errCode = 2006;
                                 String msg = "TypeCastInserter invoked with an invalid operator class name: " + lo.getClass().getSimpleName();

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Aug 27 14:51:50 2010
@@ -1509,8 +1509,7 @@ String StringList() : 
 //B = native ('mymr.jar' [, 'other.jar' ...]) A store into 'storeLocation' using storeFunc load 'loadLocation' using loadFunc ['params'];
 LogicalOperator MapReduceClause(LogicalPlan lp) : 
 {
-	LogicalPlan innerPlan = new LogicalPlan();
-	LogicalOperator  loLoad;
+    	LogicalOperator  loLoad;
 	LogicalOperator loStore;
 	LONative loNative;
 	Schema schema;
@@ -1537,11 +1536,19 @@ LogicalOperator MapReduceClause(LogicalP
 	]
 	
 	<STORE>
-	loStore = StoreClause(innerPlan)
+	loStore = StoreClause(lp)
+	{
+	    ((LOStore)loStore).setTmpStore(true);
+	    String inputAlias = ((LOStore)loStore).getAlias();
+	    LogicalOperator input = mapAliasOp.get(inputAlias);
+	    if (input == null)
+	       throw new ParseException("Unable to find alias " + inputAlias);
+	    lp.add(input);
+	    lp.connect(input, loStore);
+	    
+	}
 	<LOAD>
-	loLoad = LoadClause(innerPlan)
-	// We do this so that Store gets a pred etc
-	{innerPlan.connect(loLoad, loStore);}
+	loLoad = LoadClause(lp)
 	[ <AS> 
         (
             LOOKAHEAD(2) "(" schema = TupleSchema() ")" 
@@ -1565,19 +1572,13 @@ LogicalOperator MapReduceClause(LogicalP
 	 }
 	]
 	{
-		loNative = new LONative(lp, new OperatorKey(scope, getNextId()), innerPlan, 
-				(LOStore)loStore, (LOLoad)loLoad, nativeMRJar, params);
+		loNative = new LONative(lp, new OperatorKey(scope, getNextId()),
+		        nativeMRJar, params);
 	
 		lp.add(loNative);
-		
-		String inputAlias = ((LOStore)loStore).getAlias();
-    	LogicalOperator input = mapAliasOp.get(inputAlias);
-        if (input == null)
-            throw new ParseException("Unable to find alias " + inputAlias);
-
-        lp.add(input);
-		lp.connect(input, loNative);
-		return loNative;
+		lp.connect(loStore, loNative);
+		lp.connect(loNative, loLoad);
+		return loLoad;
 	}
 }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java Fri Aug 27 14:51:50 2010
@@ -117,10 +117,5 @@ public class InputOutputFileVisitor exte
             throw new PlanValidationException(errMsg, errCode, pigCtx.getErrorSource(), ie);
         }
     }
-    
-    @Override
-    protected void visit(LONative nativeMR) throws PlanValidationException{
-        visit(nativeMR.getStore());
-    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Fri Aug 27 14:51:50 2010
@@ -33,6 +33,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.LOJoin;
 import org.apache.pig.impl.logicalLayer.LOLimit;
 import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LONative;
 import org.apache.pig.impl.logicalLayer.LOSort;
 import org.apache.pig.impl.logicalLayer.LOSplit;
 import org.apache.pig.impl.logicalLayer.LOSplitOutput;
@@ -393,6 +394,25 @@ public class LogicalPlanMigrationVistor 
         opsMap.put(distinct, newDistinct);
         translateConnection(distinct, newDistinct);
     }
+
+    
+    public void visit(LONative nativeMR) throws VisitorException {
+        org.apache.pig.newplan.logical.relational.LONative newNativeMR = 
+            new org.apache.pig.newplan.logical.relational.LONative(
+                    logicalPlan,
+                    nativeMR.getNativeMRJar(),
+                    nativeMR.getParams()
+            );
+        newNativeMR.setAlias(nativeMR.getAlias());
+        newNativeMR.setRequestedParallelism(nativeMR.getRequestedParallelism());
+        newNativeMR.setCustomPartitioner(nativeMR.getCustomPartitioner());
+        
+        logicalPlan.add(newNativeMR);
+        opsMap.put(nativeMR, newNativeMR);
+        translateConnection(nativeMR, newNativeMR);
+    }
+    
+
     
     public void finish() {
         for(org.apache.pig.newplan.logical.expression.LogicalExpression exp: scalarAliasMap.keySet()) {

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LONative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LONative.java?rev=990165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LONative.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LONative.java Fri Aug 27 14:51:50 2010
@@ -0,0 +1,134 @@
+package org.apache.pig.newplan.logical.relational;
+
+import java.util.Arrays;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LONative extends LogicalRelationalOperator {
+
+    private String nativeMRJar;
+    private String[] params = null;
+//    private LOLoad load;
+//    private LOStore store;
+    
+    public LONative(OperatorPlan plan, String nativeJar, String[] parameters) {
+        super("LONative", plan);
+//        this.store = loStore;
+//        this.load = loLoad;
+        this.nativeMRJar = nativeJar;
+        this.params = parameters;
+        
+    }
+
+    @Override
+    public LogicalSchema getSchema() throws FrontendException {
+//        return load.getSchema();
+        return null;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws FrontendException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new FrontendException("Expected LogicalPlanVisitor", 2223);
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+
+    @Override
+    public boolean isEqual(Operator obj) throws FrontendException {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        LONative other = (LONative) obj;
+
+//        if (load == null) {
+//            if (other.load != null)
+//                return false;
+//        } else if (!load.equals(other.load))
+//            return false;
+        if (nativeMRJar == null) {
+            if (other.nativeMRJar != null)
+                return false;
+        } else if (!nativeMRJar.equals(other.nativeMRJar))
+            return false;
+        if (!Arrays.equals(params, other.params))
+            return false;
+//        if (store == null) {
+//            if (other.store != null)
+//                return false;
+//        } else if (!store.equals(other.store))
+//            return false;
+//        
+        //check predecessors and schema
+        if(! checkEquality(other))
+            return false;
+        
+        return true;
+    }
+
+
+    /**
+     * @return the nativeMRJar
+     */
+    public String getNativeMRJar() {
+        return nativeMRJar;
+    }
+
+    /**
+     * @param nativeMRJar the nativeMRJar to set
+     */
+    public void setNativeMRJar(String nativeMRJar) {
+        this.nativeMRJar = nativeMRJar;
+    }
+
+    /**
+     * @return the params
+     */
+    public String[] getParams() {
+        return params;
+    }
+
+    /**
+     * @param params the params to set
+     */
+    public void setParams(String[] params) {
+        this.params = params;
+    }
+
+//    /**
+//     * @return the load
+//     */
+//    public LOLoad getLoad() {
+//        return load;
+//    }
+//
+//    /**
+//     * @param load the load to set
+//     */
+//    public void setLoad(LOLoad load) {
+//        this.load = load;
+//    }
+//
+//    /**
+//     * @return the store
+//     */
+//    public LOStore getStore() {
+//        return store;
+//    }
+//
+//    /**
+//     * @param store the store to set
+//     */
+//    public void setStore(LOStore store) {
+//        this.store = store;
+//    }
+//
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Fri Aug 27 14:51:50 2010
@@ -47,6 +47,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
@@ -150,6 +151,42 @@ public class LogToPhyTranslationVisitor 
 //        System.err.println("Exiting Load");
     }
     
+    
+    @Override
+    public void visit(LONative loNative) throws FrontendException{     
+        String scope = DEFAULT_SCOPE;
+        
+        PONative poNative = new PONative(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)));
+        poNative.setAlias(loNative.getAlias());
+        poNative.setNativeMRjar(loNative.getNativeMRJar());
+        poNative.setParams(loNative.getParams());
+        poNative.setResultType(DataType.BAG);
+
+        logToPhyMap.put(loNative, poNative);
+        currentPlan.add(poNative);
+        
+        List<Operator> op = loNative.getPlan().getPredecessors(loNative);
+
+        PhysicalOperator from;
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+        } else {
+            int errCode = 2051;
+            String msg = "Did not find a predecessor for Native." ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+        }
+        
+        try {
+            currentPlan.connect(from, poNative);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+        
+    }
+    
     @Override
     public void visit(LOFilter filter) throws FrontendException {
         String scope = DEFAULT_SCOPE;

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java Fri Aug 27 14:51:50 2010
@@ -89,4 +89,7 @@ public abstract class LogicalRelationalN
     
     public void visit(LOStream loStream) throws FrontendException {
     }
+
+    public void visit(LONative nativeMR) throws FrontendException{     
+    }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Fri Aug 27 14:51:50 2010
@@ -17,13 +17,14 @@
  */
 package org.apache.pig.test;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Random;
 
-import junit.framework.TestCase;
-
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.BagFactory;
@@ -34,13 +35,13 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestNativeMapReduce extends TestCase {
+public class TestNativeMapReduce  {
     static MiniCluster cluster = MiniCluster.buildCluster();
     private PigServer pigServer;
     // the jar has been created using the source at
     // http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/WordCount.java:816822
     private String jarFileName = "test//org/apache/pig/test/data/TestWordCount.jar";
-    
+    private String exp_msg_prefix = "Check if expected results contains: ";
     TupleFactory mTf = TupleFactory.getInstance();
     BagFactory mBf = BagFactory.getInstance();
 
@@ -50,9 +51,9 @@ public class TestNativeMapReduce extends
     }*/
 
     @Before
-    @Override
     public void setUp() throws Exception{
         FileLocalizer.setR(new Random());
+        //FileLocalizer.setInitialized(false);
         pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
 
         //createWordCountJar();
@@ -96,17 +97,17 @@ public class TestNativeMapReduce extends
         Iterator<Tuple> iter = pigServer.openIterator("C");
         Tuple t;
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
         assertFalse(iter.hasNext());
         
@@ -117,17 +118,17 @@ public class TestNativeMapReduce extends
         // check in interactive mode
         iter = pigServer.openIterator("B");
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
         assertFalse(iter.hasNext());
         
@@ -173,17 +174,17 @@ public class TestNativeMapReduce extends
         Iterator<Tuple> iter = pigServer.openIterator("C");
         Tuple t;
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
         assertFalse(iter.hasNext());
         
@@ -193,17 +194,17 @@ public class TestNativeMapReduce extends
         // check in interactive mode
         iter = pigServer.openIterator("B");
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
         assertFalse(iter.hasNext());
         
@@ -245,33 +246,33 @@ public class TestNativeMapReduce extends
         Iterator<Tuple> iter = pigServer.openIterator("C");
         Tuple t;
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
         assertFalse(iter.hasNext());
 
         iter = pigServer.openIterator("B");
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
         assertFalse(iter.hasNext());
         
@@ -310,17 +311,17 @@ public class TestNativeMapReduce extends
         Iterator<Tuple> iter = pigServer.openIterator("C");
         Tuple t;
         
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-        assertTrue(iter.hasNext());
+        assertTrue("iter.hasNext()",iter.hasNext());
         t = iter.next();
-        assertTrue(results.contains(t.toString()));
+        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
         assertFalse(iter.hasNext());