You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/30 22:35:57 UTC

svn commit: r990934 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/tools/pigstats/ test/org/apache/pig/test/

Author: thejas
Date: Mon Aug 30 20:35:57 2010
New Revision: 990934

URL: http://svn.apache.org/viewvc?rev=990934&view=rev
Log:
PIG-1570: native mapreduce operator MR job does not follow same failure handling logic as other pig MR jobs

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Aug 30 20:35:57 2010
@@ -171,6 +171,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1570: native mapreduce operator MR job does not follow same failure handling logic as other pig MR jobs (thejas)
+
 PIG-1343: pig_log file missing even though Main tells it is creating one and
 an M/R job fails (nrai via rding)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Aug 30 20:35:57 2010
@@ -191,17 +191,18 @@ public class MapReduceLauncher extends L
         	// Initially, all jobs are in wait state.
             List<Job> jobsWithoutIds = jc.getWaitingJobs();
             log.info(jobsWithoutIds.size() +" map-reduce job(s) waiting for submission.");
-            
+            //notify listeners about jobs submitted
             ScriptState.get().emitJobsSubmittedNotification(jobsWithoutIds.size());
             
-            String jobTrackerAdd;
-            String port;
+            // determine job tracker url 
             String jobTrackerLoc;
             JobConf jobConf = jobsWithoutIds.get(0).getJobConf();
             try {
-                port = jobConf.get("mapred.job.tracker.http.address");
-                jobTrackerAdd = jobConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
-                jobTrackerLoc = jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":")) + port.substring(port.indexOf(":"));
+                String port = jobConf.get("mapred.job.tracker.http.address");
+                String jobTrackerAdd = jobConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
+                
+                jobTrackerLoc = jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":")) 
+                + port.substring(port.indexOf(":"));
             }
             catch(Exception e){
                 // Could not get the job tracker location, most probably we are running in local mode.

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java Mon Aug 30 20:35:57 2010
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import java.util.Arrays;
+
 import org.apache.hadoop.util.RunJar;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -75,20 +77,32 @@ public class NativeMapReduceOper extends
         RunJarSecurityManager secMan = new RunJarSecurityManager();
         try {
             RunJar.main(getNativeMRParams());
+            PigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
         } catch (SecurityException se) {
             if(secMan.getExitInvoked()) {
                 if(secMan.getExitCode() != 0) {
                     throw new JobCreationException("Native job returned with non-zero return code");
                 }
                 else {
-                    PigStatsUtil.addNativeJobStats(PigStats.get(), this);
+                    PigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
                 }
             }
         } catch (Throwable t) {
-            throw new JobCreationException("Cannot run native mapreduce job "+ t.getMessage());
+            JobCreationException e = new JobCreationException(
+                    "Cannot run native mapreduce job "+ t.getMessage(), t);
+            PigStatsUtil.addNativeJobStats(PigStats.get(), this, false, e);
+            throw e;
         } finally {
             secMan.retire();
         }
     }
     
+    @Override
+    public String name(){
+        return "MapReduce - " + mKey.toString() + "\n" 
+        + " Native MapReduce - jar : " + nativeMRJar + ", params: " + Arrays.toString(params) ;
+        
+        		
+    }
+    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Mon Aug 30 20:35:57 2010
@@ -43,7 +43,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
-import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.DependencyOrderWalker;

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Mon Aug 30 20:35:57 2010
@@ -294,15 +294,23 @@ public abstract class PigStatsUtil {
         return js;
     }
     
-    public static JobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr) {
+    public static JobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr,
+            boolean success) {
+        return addNativeJobStats(ps, mr, success, null);
+    }
+    
+    public static JobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr,
+            boolean success, Exception e) {
         JobStats js = ps.addJobStatsForNative(mr);
         if(js == null) {
             LOG.warn("unable to add native job stats");
         } else {
-            js.setSuccessful(true);
+            js.setSuccessful(success);
+            if(e != null)
+                js.setBackendException(e);
         }
         return js;
-    }
+    }    
     
     private static JobStats accumulateSuccessStatistics(PigStats ps, Job job) {
         JobStats js = ps.addJobStats(job);

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Mon Aug 30 20:35:57 2010
@@ -72,6 +72,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.LOForEach;
 import org.apache.pig.impl.logicalLayer.LOJoin;
 import org.apache.pig.impl.logicalLayer.LOLimit;
+import org.apache.pig.impl.logicalLayer.LONative;
 import org.apache.pig.impl.logicalLayer.LOSort;
 import org.apache.pig.impl.logicalLayer.LOSplit;
 import org.apache.pig.impl.logicalLayer.LOStream;
@@ -491,17 +492,18 @@ public class ScriptState {
             if (mro instanceof NativeMapReduceOper) {
                 feature.set(PIG_FEATURE.NATIVE.ordinal());
             }
-            try {
-                new FeatureVisitor(mro.mapPlan, feature).visit();
-                if (mro.reducePlan.isEmpty()) { 
-                    feature.set(PIG_FEATURE.MAP_ONLY.ordinal());                    
-                } else {
-                    new FeatureVisitor(mro.reducePlan, feature).visit();
+            else{// if it is NATIVE MR , don't explore its plans
+                try {
+                    new FeatureVisitor(mro.mapPlan, feature).visit();
+                    if (mro.reducePlan.isEmpty()) { 
+                        feature.set(PIG_FEATURE.MAP_ONLY.ordinal());                    
+                    } else {
+                        new FeatureVisitor(mro.reducePlan, feature).visit();
+                    }
+                } catch (VisitorException e) {
+                    LOG.warn("Feature visitor failed", e);
                 }
-            } catch (VisitorException e) {
-                LOG.warn("Feature visitor failed", e);
             }
-            
             StringBuilder sb = new StringBuilder();
             for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
                 if (sb.length() > 0) sb.append(",");             
@@ -668,6 +670,12 @@ public class ScriptState {
         protected void visit(LOUnion op) throws VisitorException {
             feature.set(PIG_FEATURE.UNION.ordinal());
         }
+        
+        @Override
+        protected void visit(LONative n) throws VisitorException {
+            feature.set(PIG_FEATURE.NATIVE.ordinal());
+        }
+
     }
     
     private static class AliasVisitor extends PhyPlanVisitor {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=990934&r1=990933&r2=990934&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Mon Aug 30 20:35:57 2010
@@ -17,43 +17,66 @@
  */
 package org.apache.pig.test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.data.BagFactory;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestNativeMapReduce  {
-    static MiniCluster cluster = MiniCluster.buildCluster();
-    private PigServer pigServer;
+    
+    //NOTE:
+    // Testing NativeMapReduce in LOCAL mode from unit test setup is not easy.
+    // (ie current WordCount.jar does not work as-is).
+    // the presence of ~/pigtest/conf/hadoop-site.xml created by MiniCluster
+    // in the class path makes the MR job try to contact the MiniCluster.
+    // if the MiniCluster shutdown is changed to delete the file, the other
+    // test cases fail because the file in classpath does not exist 
+    
     // the jar has been created using the source at
     // http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/WordCount.java:816822
     private String jarFileName = "test//org/apache/pig/test/data/TestWordCount.jar";
     private String exp_msg_prefix = "Check if expected results contains: ";
-    TupleFactory mTf = TupleFactory.getInstance();
-    BagFactory mBf = BagFactory.getInstance();
-
+    final static String INPUT_FILE = "TestMapReduceInputFile";
+    static MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pigServer = null;
+    
     /**
      * TODO - Move to runtime jar creation approach
     private void createWordCountJar() {
     }*/
+    
+    @BeforeClass
+    public static void oneTimeSetup() throws Exception{
+        String[] input = {
+                "one",
+                "two",
+                "three",
+                "three",
+                "two",
+                "three"
+        };
+        Util.createInputFile(cluster, INPUT_FILE, input);
+    }
 
     @Before
     public void setUp() throws Exception{
-        FileLocalizer.setR(new Random());
-        //FileLocalizer.setInitialized(false);
         pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
 
         //createWordCountJar();
@@ -61,272 +84,286 @@ public class TestNativeMapReduce  {
 
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
+        Util.deleteFile(cluster, INPUT_FILE);
         cluster.shutDown();
     }
+      
 
     // See PIG-506
     @Test
     public void testNativeMRJobSimple() throws Exception{
-        String[] input = {
-                "one",
-                "two",
-                "three",
-                "three",
-                "two",
-                "three"
-        };
-        Util.createInputFile(cluster, "table_testNativeMRJobSimple", input);
+        try{
+            Collection<String> results = new HashSet<String>();
+            results.add("(one,1)");
+            results.add("(two,2)");
+            results.add("(three,3)");
+
+            pigServer.setBatchOn();
+            pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
+            pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
+                    "Store A into 'table_testNativeMRJobSimple_input' "+
+                    "Load 'table_testNativeMRJobSimple_output' "+
+            "`WordCount table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output`;");
+            pigServer.registerQuery("Store B into 'table_testNativeMRJobSimpleDir';");
+            List<ExecJob> execJobs = pigServer.executeBatch();
+
+            assertEquals("num of jobs", execJobs.size(), 1);
+            boolean foundNativeFeature = false;
+            for(ExecJob job : execJobs){
+                assertEquals("job status", job.getStatus(),JOB_STATUS.COMPLETED);
+                if(job.getStatistics().getFeatures().contains("NATIVE")){
+                    foundNativeFeature = true;
+                }
+            }
+            assertTrue("foundNativeFeature", foundNativeFeature);
+            
+            // Check the output
+            pigServer.registerQuery("C = load 'table_testNativeMRJobSimpleDir';");
+
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            Tuple t;
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertFalse(iter.hasNext());
+
+            // We have to manually delete intermediate mapreduce files
+            Util.deleteFile(cluster,"table_testNativeMRJobSimple_input");
+            Util.deleteFile(cluster,"table_testNativeMRJobSimple_output");
+
+            // check in interactive mode
+            iter = pigServer.openIterator("B");
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertFalse(iter.hasNext());
+        }
+        finally{
+            // We have to manually delete intermediate mapreduce files
+            Util.deleteFile(cluster,"table_testNativeMRJobSimple_input");
+            Util.deleteFile(cluster,"table_testNativeMRJobSimple_output");
+            Util.deleteFile(cluster,"table_testNativeMRJobSimpleDir");
+        }
+    }
 
-        Collection<String> results = new HashSet<String>();
-        results.add("(one,1)");
-        results.add("(two,2)");
-        results.add("(three,3)");
-        
-        pigServer.setBatchOn();
-        pigServer.registerQuery("A = load 'table_testNativeMRJobSimple';");
-        pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
-                "Store A into 'table_testNativeMRJobSimple_input' "+
-                "Load 'table_testNativeMRJobSimple_output' "+
-        "`WordCount table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output`;");
-        pigServer.registerQuery("Store B into 'table_testNativeMRJobSimpleDir';");
-        pigServer.executeBatch();
-
-        // Check the output
-        pigServer.registerQuery("C = load 'table_testNativeMRJobSimpleDir';");
-
-        Iterator<Tuple> iter = pigServer.openIterator("C");
-        Tuple t;
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertFalse(iter.hasNext());
-        
-        // We have to manually delete intermediate mapreduce files
-        Util.deleteFile(cluster, "table_testNativeMRJobSimple_input");
-        Util.deleteFile(cluster, "table_testNativeMRJobSimple_output");
-        
-        // check in interactive mode
-        iter = pigServer.openIterator("B");
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertFalse(iter.hasNext());
-        
-        // We have to manually delete intermediate mapreduce files
-        Util.deleteFile(cluster, "table_testNativeMRJobSimple_input");
-        Util.deleteFile(cluster, "table_testNativeMRJobSimple_output");
-        
-        Util.deleteFile(cluster, "table_testNativeMRJobSimple");
-        Util.deleteFile(cluster, "table_testNativeMRJobSimpleDir");
+    
+    @Test
+    public void testNativeMRJobSimpleFailure() throws Exception{
+        try{
+            //test if correct return code is obtained when query fails
+            // the native MR is writing to an exisiting and should fail
+            
+            Collection<String> results = new HashSet<String>();
+            results.add("(one,1)");
+            results.add("(two,2)");
+            results.add("(three,3)");
+
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+            
+            pigServer.setBatchOn();
+            pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
+            pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
+                    "Store A into 'table_testNativeMRJobSimple_input' "+
+                    "Load 'table_testNativeMRJobSimple_output' "+
+            "`WordCount table_testNativeMRJobSimple_input " + INPUT_FILE + "`;");
+            pigServer.registerQuery("Store B into 'table_testNativeMRJobSimpleDir';");
+//            List<ExecJob> execJobs = pigServer.executeBatch();
+
+
+            assertTrue("job failed", PigStats.get().getReturnCode() != 0);
+   
+        }
+        finally{
+            // We have to manually delete intermediate mapreduce files
+            Util.deleteFile(cluster, "table_testNativeMRJobSimple_input");
+            Util.deleteFile(cluster, "table_testNativeMRJobSimpleDir");
+        }
     }
 
+    
     // See PIG-506
     @Test
     public void testNativeMRJobMultiStoreOnPred() throws Exception{
-        String[] input = {
-                "one",
-                "two",
-                "three",
-                "three",
-                "two",
-                "three"
-        };
-        Util.createInputFile(cluster, "table_testNativeMRJobMultiStoreOnPred", input);
+        try{
 
-        Collection<String> results = new HashSet<String>();
-        results.add("(one,1)");
-        results.add("(two,2)");
-        results.add("(three,3)");
-        
-        pigServer.setBatchOn();
-        pigServer.registerQuery("A = load 'table_testNativeMRJobMultiStoreOnPred';");
-        pigServer.registerQuery("Store A into 'testNativeMRJobMultiStoreOnPredTemp';");
-        pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
-                "Store A into 'table_testNativeMRJobMultiStoreOnPred_input' "+
-                "Load 'table_testNativeMRJobMultiStoreOnPred_output' "+
-        "`WordCount table_testNativeMRJobMultiStoreOnPred_input table_testNativeMRJobMultiStoreOnPred_output`;");
-        pigServer.registerQuery("Store B into 'table_testNativeMRJobMultiStoreOnPredDir';");
-        pigServer.executeBatch();
-
-        // Check the output
-        pigServer.registerQuery("C = load 'table_testNativeMRJobMultiStoreOnPredDir';");
-
-        Iterator<Tuple> iter = pigServer.openIterator("C");
-        Tuple t;
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertFalse(iter.hasNext());
-        
-        Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPred_input");
-        Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPred_output");
-
-        // check in interactive mode
-        iter = pigServer.openIterator("B");
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertFalse(iter.hasNext());
-        
-        Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPred_input");
-        Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPred_output");
-        
-        Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPred");
-        Util.deleteFile(cluster, "table_testNativeMRJobMultiStoreOnPredDir");
+            Collection<String> results = new HashSet<String>();
+            results.add("(one,1)");
+            results.add("(two,2)");
+            results.add("(three,3)");
+
+            pigServer.setBatchOn();
+            pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
+            pigServer.registerQuery("Store A into 'testNativeMRJobMultiStoreOnPredTemp';");
+            pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
+                    "Store A into 'table_testNativeMRJobMultiStoreOnPred_input' "+
+                    "Load 'table_testNativeMRJobMultiStoreOnPred_output' "+
+            "`WordCount table_testNativeMRJobMultiStoreOnPred_input table_testNativeMRJobMultiStoreOnPred_output`;");
+            pigServer.registerQuery("Store B into 'table_testNativeMRJobMultiStoreOnPredDir';");
+            pigServer.executeBatch();
+
+            // Check the output
+            pigServer.registerQuery("C = load 'table_testNativeMRJobMultiStoreOnPredDir';");
+
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            Tuple t;
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertFalse(iter.hasNext());
+
+            Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_input");
+            Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_output");
+
+            // check in interactive mode
+            iter = pigServer.openIterator("B");
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertFalse(iter.hasNext());
+        }
+        finally{
+            Util.deleteFile(cluster,"testNativeMRJobMultiStoreOnPredTemp");
+            Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_input");
+            Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_output");
+            Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPredDir");
+        }
     }
 
     // See PIG-506
     @Test
     public void testNativeMRJobMultiQueryOpt() throws Exception{
-        String[] input = {
-                "one",
-                "two",
-                "three",
-                "three",
-                "two",
-                "three"
-        };
-        Util.createInputFile(cluster, "table_testNativeMRJobMultiQueryOpt", input);
-
-        Collection<String> results = new HashSet<String>();
-        results.add("(one,1)");
-        results.add("(two,2)");
-        results.add("(three,3)");
-        
-        pigServer.registerQuery("A = load 'table_testNativeMRJobMultiQueryOpt';");
-        pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
-                "Store A into 'table_testNativeMRJobMultiQueryOpt_inputB' "+
-                "Load 'table_testNativeMRJobMultiQueryOpt_outputB' "+
-        "`WordCount table_testNativeMRJobMultiQueryOpt_inputB table_testNativeMRJobMultiQueryOpt_outputB`;");
-        pigServer.registerQuery("C = mapreduce '" + jarFileName + "' " +
-                "Store A into 'table_testNativeMRJobMultiQueryOpt_inputC' "+
-                "Load 'table_testNativeMRJobMultiQueryOpt_outputC' "+
-        "`WordCount table_testNativeMRJobMultiQueryOpt_inputC table_testNativeMRJobMultiQueryOpt_outputC`;");
-
-        Iterator<Tuple> iter = pigServer.openIterator("C");
-        Tuple t;
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertFalse(iter.hasNext());
-
-        iter = pigServer.openIterator("B");
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertFalse(iter.hasNext());
-        
-        Util.deleteFile(cluster, "table_testNativeMRJobMultiQueryOpt");
-        Util.deleteFile(cluster, "table_testNativeMRJobMultiQueryOpt_inputB");
-        Util.deleteFile(cluster, "table_testNativeMRJobMultiQueryOpt_outputB");
-        Util.deleteFile(cluster, "table_testNativeMRJobMultiQueryOpt_inputC");
-        Util.deleteFile(cluster, "table_testNativeMRJobMultiQueryOpt_outputC");
+        try{
+            Collection<String> results = new HashSet<String>();
+            results.add("(one,1)");
+            results.add("(two,2)");
+            results.add("(three,3)");
+
+            pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
+            pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
+                    "Store A into 'table_testNativeMRJobMultiQueryOpt_inputB' "+
+                    "Load 'table_testNativeMRJobMultiQueryOpt_outputB' "+
+            "`WordCount table_testNativeMRJobMultiQueryOpt_inputB table_testNativeMRJobMultiQueryOpt_outputB`;");
+            pigServer.registerQuery("C = mapreduce '" + jarFileName + "' " +
+                    "Store A into 'table_testNativeMRJobMultiQueryOpt_inputC' "+
+                    "Load 'table_testNativeMRJobMultiQueryOpt_outputC' "+
+            "`WordCount table_testNativeMRJobMultiQueryOpt_inputC table_testNativeMRJobMultiQueryOpt_outputC`;");
+
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            Tuple t;
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertFalse(iter.hasNext());
+
+            iter = pigServer.openIterator("B");
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertFalse(iter.hasNext());
+        }finally{
+            Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_inputB");
+            Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_outputB");
+            Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_inputC");
+            Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_outputC");
+        }
     }
-    
+
     // See PIG-506
     @Test
     public void testNativeMRJobTypeCastInserter() throws Exception{
-        String[] input = {
-                "one",
-                "two",
-                "three",
-                "three",
-                "two",
-                "three"
-        };
-        Util.createInputFile(cluster, "table_testNativeMRJobTypeCastInserter", input);
-
-        Collection<String> results = new HashSet<String>();
-        results.add("(2)");
-        results.add("(3)");
-        results.add("(4)");
-        
-        pigServer.registerQuery("A = load 'table_testNativeMRJobTypeCastInserter';");
-        pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
-                "Store A into 'table_testNativeMRJobTypeCastInserter_input' "+
-                "Load 'table_testNativeMRJobTypeCastInserter_output' as (name:chararray, count: int)"+
-        "`WordCount table_testNativeMRJobTypeCastInserter_input table_testNativeMRJobTypeCastInserter_output`;");
-        pigServer.registerQuery("C = foreach B generate count+1;");
-        
-        Iterator<Tuple> iter = pigServer.openIterator("C");
-        Tuple t;
-        
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertTrue("iter.hasNext()",iter.hasNext());
-        t = iter.next();
-        assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
-        assertFalse(iter.hasNext());
-        
-        Util.deleteFile(cluster, "table_testNativeMRJobTypeCastInserter");
-        Util.deleteFile(cluster, "table_testNativeMRJobTypeCastInserter_input");
-        Util.deleteFile(cluster, "table_testNativeMRJobTypeCastInserter_output");
+        try{
+            Collection<String> results = new HashSet<String>();
+            results.add("(2)");
+            results.add("(3)");
+            results.add("(4)");
+            
+            pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
+            pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
+                    "Store A into 'table_testNativeMRJobTypeCastInserter_input' "+
+                    "Load 'table_testNativeMRJobTypeCastInserter_output' as (name:chararray, count: int)"+
+            "`WordCount table_testNativeMRJobTypeCastInserter_input table_testNativeMRJobTypeCastInserter_output`;");
+            pigServer.registerQuery("C = foreach B generate count+1;");
+
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            Tuple t;
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertTrue("iter.hasNext()",iter.hasNext());
+            t = iter.next();
+            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
+
+            assertFalse(iter.hasNext());
+        }finally{     
+            Util.deleteFile(cluster,"table_testNativeMRJobTypeCastInserter_input");
+            Util.deleteFile(cluster,"table_testNativeMRJobTypeCastInserter_output");
+        }
     }
+
 }