You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2012/09/20 05:07:57 UTC

svn commit: r1387846 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java test/org/apache/pig/test/TestStore.java test/org/apache/pig/test/Util.java

Author: gates
Date: Thu Sep 20 03:07:57 2012
New Revision: 1387846

URL: http://svn.apache.org/viewvc?rev=1387846&view=rev
Log:
PIG-2712 Pig does not call OutputCommitter.abortJob() on the underlying OutputFormat

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
    pig/trunk/test/org/apache/pig/test/TestStore.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1387846&r1=1387845&r2=1387846&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Sep 20 03:07:57 2012
@@ -25,6 +25,8 @@ PIG-1891 Enable StoreFunc to make intell
 
 IMPROVEMENTS
 
+PIG-2712: Pig does not call OutputCommitter.abortJob() on the underlying OutputFormat (rohini via gates)
+
 PIG-2918: Avoid Spillable bag overhead where possible (dvryaboy)
 
 PIG-2900: Streaming should provide conf settings in the environment (dvryaboy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1387846&r1=1387845&r2=1387846&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Thu Sep 20 03:07:57 2012
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.pig.ResourceSchema;
@@ -143,7 +144,7 @@ public class PigOutputCommitter extends 
             }
         }
     }
-    
+
     @Override
     public void cleanupJob(JobContext context) throws IOException {
         // call clean up on all map and reduce committers
@@ -166,7 +167,7 @@ public class PigOutputCommitter extends 
         }
        
     }
-
+    
     // This method only be called in 20.203+/0.23
     public void commitJob(JobContext context) throws IOException {
         // call commitJob on all map and reduce committers
@@ -204,6 +205,42 @@ public class PigOutputCommitter extends 
             }
         }
     }
+    
+    // This method only be called in 20.203+/0.23
+    public void abortJob(JobContext context, State state) throws IOException {
+     // call abortJob on all map and reduce committers
+        for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
+            if (mapCommitter.first!=null) {
+                JobContext updatedContext = setUpContext(context,
+                        mapCommitter.second);
+                try {
+                    // Use reflection, 20.2 does not have such method
+                    Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
+                    m.setAccessible(true);
+                    m.invoke(mapCommitter.first, updatedContext, state);
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+                storeCleanup(mapCommitter.second, updatedContext.getConfiguration());
+            }
+        }
+        for (Pair<OutputCommitter, POStore> reduceCommitter :
+            reduceOutputCommitters) {
+            if (reduceCommitter.first!=null) {
+                JobContext updatedContext = setUpContext(context,
+                        reduceCommitter.second);
+                try {
+                    // Use reflection, 20.2 does not have such method
+                    Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
+                    m.setAccessible(true);
+                    m.invoke(reduceCommitter.first, updatedContext, state);
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+                storeCleanup(reduceCommitter.second, updatedContext.getConfiguration());
+            }
+        }
+    }
 
 
     @Override

Modified: pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=1387846&r1=1387845&r2=1387846&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStore.java Thu Sep 20 03:07:57 2012
@@ -23,16 +23,24 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
 
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
@@ -45,6 +53,7 @@ import org.apache.pig.backend.datastorag
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -96,14 +105,15 @@ public class TestStore extends junit.fra
     private static final String FAIL_UDF_NAME
     = "org.apache.pig.test.TestStore\\$FailUDF";
     private static final String MAP_MAX_ATTEMPTS = "mapred.map.max.attempts"; 
-    
+    private static final String TESTDIR = "/tmp/" + TestStore.class.getSimpleName();
+
     @Override
     @Before
     public void setUp() throws Exception {
         pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pc = pig.getPigContext();
-        inputFileName = "/tmp/TestStore-" + new Random().nextLong() + ".txt";
-        outputFileName = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
+        inputFileName = TESTDIR + "/TestStore-" + new Random().nextLong() + ".txt";
+        outputFileName = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
         
         DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.UTC.getOffset(null)));
     }
@@ -111,16 +121,15 @@ public class TestStore extends junit.fra
     @Override
     @After
     public void tearDown() throws Exception {
-        Util.deleteFile(cluster, inputFileName);
-        Util.deleteFile(cluster, outputFileName);
-        new File(outputFileName).delete();
+        Util.deleteDirectory(new File(TESTDIR));
+        Util.deleteFile(cluster, TESTDIR);
     }
 
     private void storeAndCopyLocally(DataBag inpDB) throws Exception {
         setUpInputFileOnCluster(inpDB);
         String script = "a = load '" + inputFileName + "'; " +
                 "store a into '" + outputFileName + "' using PigStorage('\t');" +
-                "fs -ls /tmp";
+                "fs -ls " + TESTDIR;
         pig.setBatchOn();
         Util.registerMultiLineQuery(pig, script);
         pig.executeBatch();
@@ -373,7 +382,15 @@ public class TestStore extends junit.fra
     @Test
     public void testSetStoreSchema() throws Exception {
         PigServer ps = null;
-        String storeSchemaOutputFile = outputFileName + "_storeSchema_test";
+        Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
+        filesToVerify.put(outputFileName + "_storeSchema_test", Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED, Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED, Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED, Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED, Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED, Boolean.FALSE);
         try {
             ExecType[] modes = new ExecType[] { ExecType.MAPREDUCE, ExecType.LOCAL};
             String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
@@ -383,37 +400,38 @@ public class TestStore extends junit.fra
             		DUMMY_STORE_CLASS_NAME + "();";
             
             for (ExecType execType : modes) {
+                FileLocalizer.setInitialized(false);
                 if(execType == ExecType.MAPREDUCE) {
                     ps = new PigServer(ExecType.MAPREDUCE, 
                             cluster.getProperties());
-                    Util.deleteFile(ps.getPigContext(), inputFileName);
-                    Util.deleteFile(ps.getPigContext(), outputFileName);
-                    Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
                 } else {
                     Properties props = new Properties();                                          
                     props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
                     ps = new PigServer(ExecType.LOCAL, props);
-                    Util.deleteFile(ps.getPigContext(), inputFileName);
-                    Util.deleteFile(ps.getPigContext(), outputFileName);
-                    Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
+                    if (Util.isHadoop1_0()) {
+                        // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce
+                        // OutputCommitter) is fixed only in 0.23.1
+                        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.FALSE);
+                        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.FALSE);
+                    }
                 }
                 ps.setBatchOn();
+                Util.deleteFile(ps.getPigContext(), TESTDIR);
                 Util.createInputFile(ps.getPigContext(), 
                         inputFileName, inputData);
                 Util.registerMultiLineQuery(ps, script);
                 ps.executeBatch();
-                assertEquals(
-                        "Checking if file indicating that storeSchema was " +
-                        "called exists in " + execType + " mode", true, 
-                        Util.exists(ps.getPigContext(), storeSchemaOutputFile));
+                for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
+                    String condition = entry.getValue() ? "" : "not";
+                    assertEquals("Checking if file " + entry.getKey() +
+                            " does " + condition + " exists in " + execType +
+                            " mode", (boolean) entry.getValue(),
+                            Util.exists(ps.getPigContext(), entry.getKey()));
+                }
             }
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail("Exception encountered - hence failing:" + e);
-        } finally {
-            Util.deleteFile(ps.getPigContext(), inputFileName);
-            Util.deleteFile(ps.getPigContext(), outputFileName);
-            Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
         }
     }
     
@@ -439,10 +457,7 @@ public class TestStore extends junit.fra
                     props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
                     ps = new PigServer(ExecType.LOCAL, props);
                 }
-                Util.deleteFile(ps.getPigContext(), inputFileName);
-                Util.deleteFile(ps.getPigContext(), outputFileName);
-                Util.deleteFile(ps.getPigContext(), cleanupFailFile);
-                Util.deleteFile(ps.getPigContext(), cleanupSuccessFile);
+                Util.deleteFile(ps.getPigContext(), TESTDIR);
                 ps.setBatchOn();
                 Util.createInputFile(ps.getPigContext(), 
                         inputFileName, inputData);
@@ -460,11 +475,6 @@ public class TestStore extends junit.fra
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail("Exception encountered - hence failing:" + e);
-        } finally {
-            Util.deleteFile(ps.getPigContext(), inputFileName);
-            Util.deleteFile(ps.getPigContext(), outputFileName);
-            Util.deleteFile(ps.getPigContext(), cleanupFailFile);
-            Util.deleteFile(ps.getPigContext(), cleanupSuccessFile);
         }
     }
     
@@ -472,15 +482,31 @@ public class TestStore extends junit.fra
     @Test
     public void testCleanupOnFailureMultiStore() throws Exception {
         PigServer ps = null;
-        String outputFileName1 = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
-        String outputFileName2 = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
-        String cleanupSuccessFile1 = outputFileName1 + "_cleanupOnFailure_succeeded1";
-        String cleanupFailFile1 = outputFileName1 + "_cleanupOnFailure_failed1";
-        String cleanupSuccessFile2 = outputFileName2 + "_cleanupOnFailure_succeeded2";
-        String cleanupFailFile2 = outputFileName2 + "_cleanupOnFailure_failed2";
+        String outputFileName1 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
+        String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
+        
+        Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
+        filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE);
+        filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE);
+        filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE);
+        filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE);
+        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE);
+        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE);
         
         try {
-            ExecType[] modes = new ExecType[] { /*ExecType.LOCAL, */ExecType.MAPREDUCE};
+            ExecType[] modes = new ExecType[] { ExecType.MAPREDUCE, ExecType.LOCAL};
             String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
             
             // though the second store should
@@ -493,6 +519,7 @@ public class TestStore extends junit.fra
                     DUMMY_STORE_CLASS_NAME + "('false', '2');"; 
             
             for (ExecType execType : modes) {
+                FileLocalizer.setInitialized(false);
                 if(execType == ExecType.MAPREDUCE) {
                     ps = new PigServer(ExecType.MAPREDUCE, 
                             cluster.getProperties());
@@ -500,50 +527,38 @@ public class TestStore extends junit.fra
                     Properties props = new Properties();                                          
                     props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
                     ps = new PigServer(ExecType.LOCAL, props);
+                    // LocalJobRunner does not call abortTask
+                    filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+                    filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE);
+                    if (Util.isHadoop1_0()) {
+                        // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce
+                        // OutputCommitter) is fixed only in 0.23.1
+                        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.FALSE);
+                        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.FALSE);
+                        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE);
+                        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE);
+                    }
                 }
-                Util.deleteFile(ps.getPigContext(), inputFileName);
-                Util.deleteFile(ps.getPigContext(), outputFileName1);
-                Util.deleteFile(ps.getPigContext(), outputFileName2);
-                Util.deleteFile(ps.getPigContext(), cleanupFailFile1);
-                Util.deleteFile(ps.getPigContext(), cleanupSuccessFile1);
-                Util.deleteFile(ps.getPigContext(), cleanupFailFile2);
-                Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2);
+                Util.deleteFile(ps.getPigContext(), TESTDIR);
                 ps.setBatchOn();
                 Util.createInputFile(ps.getPigContext(), 
                         inputFileName, inputData);
                 Util.registerMultiLineQuery(ps, script);
                 ps.executeBatch();
-                assertEquals(
-                        "Checking if file indicating that cleanupOnFailure failed " +
-                        " does not exists in " + execType + " mode", false, 
-                        Util.exists(ps.getPigContext(), cleanupFailFile1));
-                assertEquals(
-                        "Checking if file indicating that cleanupOnFailure failed " +
-                        " does not exists in " + execType + " mode", false, 
-                        Util.exists(ps.getPigContext(), cleanupFailFile2));
-                assertEquals(
-                        "Checking if file indicating that cleanupOnFailure was " +
-                        "successfully called exists in " + execType + " mode", true, 
-                        Util.exists(ps.getPigContext(), cleanupSuccessFile1));
-                assertEquals(
-                        "Checking if file indicating that cleanupOnFailure was " +
-                        "successfully called exists in " + execType + " mode", true, 
-                        Util.exists(ps.getPigContext(), cleanupSuccessFile2));
+                for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
+                    String condition = entry.getValue() ? "" : "not";
+                    assertEquals("Checking if file " + entry.getKey() +
+                            " does " + condition + " exists in " + execType +
+                            " mode", (boolean) entry.getValue(),
+                            Util.exists(ps.getPigContext(), entry.getKey()));
+                }
             }
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail("Exception encountered - hence failing:" + e);
-        } finally {
-            Util.deleteFile(ps.getPigContext(), inputFileName);
-            Util.deleteFile(ps.getPigContext(), outputFileName1);
-            Util.deleteFile(ps.getPigContext(), outputFileName2);
-            Util.deleteFile(ps.getPigContext(), cleanupFailFile1);
-            Util.deleteFile(ps.getPigContext(), cleanupSuccessFile1);
-            Util.deleteFile(ps.getPigContext(), cleanupFailFile2);
-            Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2);
         }
     }
-    
+
     // Test that "_SUCCESS" file is created when "mapreduce.fileoutputcommitter.marksuccessfuljobs"
     // property is set to true
     // The test covers multi store and single store case in local and mapreduce mode
@@ -552,8 +567,7 @@ public class TestStore extends junit.fra
     @Test
     public void testSuccessFileCreation1() throws Exception {
         PigServer ps = null;
-        String[] files = new String[] { inputFileName, 
-                outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"};
+        
         try {
             ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
             String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"};
@@ -589,7 +603,7 @@ public class TestStore extends junit.fra
                         ps.getPigContext().getProperties().setProperty(
                                 MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
                                 Boolean.toString(isPropertySet));
-                        cleanupFiles(ps, files);
+                        Util.deleteFile(ps.getPigContext(), TESTDIR);
                         ps.setBatchOn();
                         Util.createInputFile(ps.getPigContext(), 
                                 inputFileName, inputData);
@@ -606,7 +620,7 @@ public class TestStore extends junit.fra
                 }
             }
         } finally {
-            cleanupFiles(ps, files);
+            Util.deleteFile(ps.getPigContext(), TESTDIR);
         }
     }
 
@@ -618,8 +632,6 @@ public class TestStore extends junit.fra
     @Test
     public void testSuccessFileCreation2() throws Exception {
         PigServer ps = null;
-        String[] files = new String[] { inputFileName, 
-                outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"};
         try {
             ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
             String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"};
@@ -663,7 +675,7 @@ public class TestStore extends junit.fra
                         ps.getPigContext().getProperties().setProperty(
                                 MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
                                 Boolean.toString(isPropertySet));
-                        cleanupFiles(ps, files);
+                        Util.deleteFile(ps.getPigContext(), TESTDIR);
                         ps.setBatchOn();
                         Util.createInputFile(ps.getPigContext(), 
                                 inputFileName, inputData);
@@ -687,7 +699,7 @@ public class TestStore extends junit.fra
                 }
             }
         } finally {
-            cleanupFiles(ps, files);
+            Util.deleteFile(ps.getPigContext(), TESTDIR);
         }
     }
 
@@ -700,13 +712,7 @@ public class TestStore extends junit.fra
         }
         
     }
-    private void cleanupFiles(PigServer ps, String... files) throws IOException {
-        for(String file:files) {
-            Util.deleteFile(ps.getPigContext(), file);
-        }    
-    }
-    
-    
+
     public static class DummyStore extends PigStorage implements StoreMetadata{
 
         private boolean failInPutNext = false;
@@ -734,6 +740,12 @@ public class TestStore extends junit.fra
             super.putNext(t);
         }
 
+        @SuppressWarnings("rawtypes")
+        @Override
+        public OutputFormat getOutputFormat() {
+            return new DummyOutputFormat(outputFileSuffix);
+        }
+
         @Override
         public void storeSchema(ResourceSchema schema, String location,
                 Job job) throws IOException {
@@ -814,4 +826,113 @@ public class TestStore extends junit.fra
             Assert.assertEquals(expected, p);
         }
     }
+
+    static class DummyOutputFormat extends PigTextOutputFormat {
+
+        private String outputFileSuffix;
+
+        public DummyOutputFormat(String outputFileSuffix) {
+            super((byte) '\t');
+            this.outputFileSuffix = outputFileSuffix;
+        }
+
+        @Override
+        public synchronized OutputCommitter getOutputCommitter(
+                TaskAttemptContext context) throws IOException {
+            return new DummyOutputCommitter(outputFileSuffix,
+                    super.getOutputCommitter(context));
+        }
+
+        @Override
+        public Path getDefaultWorkFile(TaskAttemptContext context,
+                String extension) throws IOException {
+            FileOutputCommitter committer =
+                    (FileOutputCommitter) super.getOutputCommitter(context);
+            return new Path(committer.getWorkPath(), getUniqueFile(context,
+                    "part", extension));
+        }
+
+    }
+
+    static class DummyOutputCommitter extends OutputCommitter {
+
+        static String FILE_SETUPJOB_CALLED = "/tmp/TestStore/_setupJob_called";
+        static String FILE_SETUPTASK_CALLED = "/tmp/TestStore/_setupTask_called";
+        static String FILE_COMMITTASK_CALLED = "/tmp/TestStore/_commitTask_called";
+        static String FILE_ABORTTASK_CALLED = "/tmp/TestStore/_abortTask_called";
+        static String FILE_CLEANUPJOB_CALLED = "/tmp/TestStore/_cleanupJob_called";
+        static String FILE_COMMITJOB_CALLED = "/tmp/TestStore/_commitJob_called";
+        static String FILE_ABORTJOB_CALLED = "/tmp/TestStore/_abortJob_called";
+
+        private String outputFileSuffix;
+        private OutputCommitter baseCommitter;
+
+        public DummyOutputCommitter(String outputFileSuffix,
+                OutputCommitter baseCommitter) throws IOException {
+            this.outputFileSuffix = outputFileSuffix;
+            this.baseCommitter = baseCommitter;
+        }
+
+        @Override
+        public void setupJob(JobContext jobContext) throws IOException {
+            baseCommitter.setupJob(jobContext);
+            createFile(jobContext, FILE_SETUPJOB_CALLED + outputFileSuffix);
+        }
+
+        @Override
+        public void setupTask(TaskAttemptContext taskContext)
+                throws IOException {
+            baseCommitter.setupTask(taskContext);
+            createFile(taskContext, FILE_SETUPTASK_CALLED + outputFileSuffix);
+        }
+
+        @Override
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+                throws IOException {
+            return true;
+        }
+
+        @Override
+        public void commitTask(TaskAttemptContext taskContext)
+                throws IOException {
+            baseCommitter.commitTask(taskContext);
+            createFile(taskContext, FILE_COMMITTASK_CALLED + outputFileSuffix);
+        }
+
+        @Override
+        public void abortTask(TaskAttemptContext taskContext)
+                throws IOException {
+            baseCommitter.abortTask(taskContext);
+            createFile(taskContext, FILE_ABORTTASK_CALLED + outputFileSuffix);
+        }
+
+        @Override
+        public void cleanupJob(JobContext jobContext) throws IOException {
+            baseCommitter.cleanupJob(jobContext);
+            createFile(jobContext, FILE_CLEANUPJOB_CALLED + outputFileSuffix);
+        }
+
+        @Override
+        public void commitJob(JobContext jobContext) throws IOException {
+            baseCommitter.commitJob(jobContext);
+            createFile(jobContext, FILE_COMMITJOB_CALLED + outputFileSuffix);
+        }
+
+        @Override
+        public void abortJob(JobContext jobContext, State state)
+                throws IOException {
+            baseCommitter.abortJob(jobContext, state);
+            createFile(jobContext, FILE_ABORTJOB_CALLED + outputFileSuffix);
+        }
+
+        public void createFile(JobContext jobContext, String fileName)
+                throws IOException {
+            Configuration conf = jobContext.getConfiguration();
+            FileSystem fs = FileSystem.get(conf);
+            fs.mkdirs(new Path(fileName).getParent());
+            FSDataOutputStream out = fs.create(new Path(fileName), true);
+            out.close();
+        }
+
+    }
 }

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1387846&r1=1387845&r2=1387846&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Thu Sep 20 03:07:57 2012
@@ -575,6 +575,10 @@ public class Util {
     }
 	
 	static public void copyFromClusterToLocal(MiniCluster cluster, String fileNameOnCluster, String localFileName) throws IOException {
+	    File parent = new File(localFileName).getParentFile();
+	    if (!parent.exists()) {
+	        parent.mkdirs();
+	    }
 	    PrintWriter writer = new PrintWriter(new FileWriter(localFileName));
 	    
 	    FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(