You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/06/04 18:05:40 UTC

svn commit: r951469 - in /hadoop/pig/branches/branch-0.7: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java test/org/apache/pig/test/TestStore.java

Author: pradeepkth
Date: Fri Jun  4 16:05:40 2010
New Revision: 951469

URL: http://svn.apache.org/viewvc?rev=951469&view=rev
Log:
PIG-1433: pig should create success file if mapreduce.fileoutputcommitter.marksuccessfuljobs is true (pradeepkth)

Modified:
    hadoop/pig/branches/branch-0.7/CHANGES.txt
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestStore.java

Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=951469&r1=951468&r2=951469&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/CHANGES.txt Fri Jun  4 16:05:40 2010
@@ -189,6 +189,9 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-1433: pig should create success file if
+mapreduce.fileoutputcommitter.marksuccessfuljobs is true (pradeepkth)
+
 PIG-1415: LoadFunc signature is not correct in LoadFunc.getSchema sometimes (daijy)
 
 PIG-1403: Make Pig work with remote HDFS in secure mode (daijy)

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=951469&r1=951468&r2=951469&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Jun  4 16:05:40 2010
@@ -29,6 +29,8 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -77,6 +79,11 @@ public class MapReduceLauncher extends L
     private boolean aggregateWarning = false;
     private Map<FileSpec, Exception> failureMap;
 
+    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+    
+    public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+        "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+    
     /**
      * Get the exception that caused a failure on the backend for a
      * store location (if any).
@@ -304,6 +311,9 @@ public class MapReduceLauncher extends L
                     }
                     if (!st.isTmpStore()) {
                         succeededStores.add(st);
+                        // create an "_SUCCESS" file in output location if 
+                        // output location is a filesystem dir
+                        createSuccessFile(job, st);
                         finalStores++;
                         log.info("Successfully stored result in: \""+st.getSFile().getFileName()+"\"");
                     }
@@ -488,6 +498,24 @@ public class MapReduceLauncher extends L
         PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration());
     }
 
+    private boolean shouldMarkOutputDir(Job job) {
+        return job.getJobConf().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
+                               false);
+    }
+    
+    private void createSuccessFile(Job job, POStore store) throws IOException {
+        if(shouldMarkOutputDir(job)) {
+            FileSystem fs = FileSystem.get(job.getJobConf());
+            Path outputPath = new Path(store.getSFile().getFileName());
+            if(fs.exists(outputPath)){
+                // create a file in the folder to mark it
+                Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+                if(!fs.exists(filePath)) {
+                    fs.create(filePath).close();
+                }
+            }    
+        } 
+    }
     
     /**
      * An exception handler class to handle exceptions thrown by the job controller thread

Modified: hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestStore.java?rev=951469&r1=951468&r2=951469&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestStore.java Fri Jun  4 16:05:40 2010
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
@@ -55,6 +56,7 @@ import org.apache.pig.data.DefaultBagFac
 import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -89,7 +91,10 @@ public class TestStore extends junit.fra
         
     String inputFileName;
     String outputFileName;
-        
+    
+
+    private static final String MAP_MAX_ATTEMPTS = "mapred.map.max.attempts"; 
+    
     @Override
     @Before
     public void setUp() throws Exception {
@@ -557,6 +562,170 @@ public class TestStore extends junit.fra
             Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2);
         }
     }
+    
+    // Test that "_SUCCESS" file is created when "mapreduce.fileoutputcommitter.marksuccessfuljobs"
+    // property is set to true
+    // The test covers multi store and single store case in local and mapreduce mode
+    // The test also checks that "_SUCCESS" file is NOT created when the property
+    // is not set to true in all the modes.
+    @Test
+    public void testSuccessFileCreation1() throws Exception {
+        PigServer ps = null;
+        String[] files = new String[] { inputFileName, 
+                outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"};
+        try {
+            ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
+            String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"};
+            
+            String multiStoreScript = "a = load '"+ inputFileName + "';" +
+                    "b = filter a by $0 == 'hello';" +
+                    "c = filter a by $0 == 'hi';" +
+                    "d = filter a by $0 == 'bye';" +
+                    "store b into '" + outputFileName + "_1';" +
+                    "store c into '" + outputFileName + "_2';" + 
+                    "store d into '" + outputFileName + "_3';";
+            
+            String singleStoreScript =  "a = load '"+ inputFileName + "';" +
+                "store a into '" + outputFileName + "_1';" ;
+            
+            for (ExecType execType : modes) {
+                for(boolean isPropertySet: new boolean[] { true, false}) {
+                    for(boolean isMultiStore: new boolean[] { true, false}) {
+                        String script = (isMultiStore ? multiStoreScript : 
+                            singleStoreScript);
+                        // since we will be switching between map red and local modes
+                        // we will need to make sure filelocalizer is reset before each
+                        // run.
+                        FileLocalizer.setInitialized(false);
+                        if(execType == ExecType.MAPREDUCE) {
+                            ps = new PigServer(ExecType.MAPREDUCE, 
+                                    cluster.getProperties());
+                        } else {
+                            Properties props = new Properties();                                          
+                            props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+                            ps = new PigServer(ExecType.LOCAL, props);
+                        }
+                        ps.getPigContext().getProperties().setProperty(
+                                MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
+                                Boolean.toString(isPropertySet));
+                        cleanupFiles(ps, files);
+                        ps.setBatchOn();
+                        Util.createInputFile(ps.getPigContext(), 
+                                inputFileName, inputData);
+                        Util.registerMultiLineQuery(ps, script);
+                        ps.executeBatch();
+                        for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+                            String sucFile = outputFileName + "_" + i + "/" + 
+                                               MapReduceLauncher.SUCCEEDED_FILE_NAME;
+                            assertEquals("Checking if _SUCCESS file exists in " + 
+                                    execType + " mode", isPropertySet, 
+                                    Util.exists(ps.getPigContext(), sucFile));
+                        }
+                    }
+                }
+            }
+        } finally {
+            cleanupFiles(ps, files);
+        }
+    }
+
+    // Test _SUCCESS file is NOT created when job fails and when 
+    // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to true
+    // The test covers multi store and single store case in local and mapreduce mode
+    // The test also checks that "_SUCCESS" file is NOT created when the property
+    // is not set to true in all the modes.
+    @Test
+    public void testSuccessFileCreation2() throws Exception {
+        PigServer ps = null;
+        String[] files = new String[] { inputFileName, 
+                outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"};
+        try {
+            ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
+            String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"};
+            System.err.println("XXX: " + TestStore.FailUDF.class.getName());
+            String multiStoreScript = "a = load '"+ inputFileName + "';" +
+                    "b = filter a by $0 == 'hello';" +
+                    "b = foreach b generate " + FailUDF.class.getName() + "($0);" + 
+                    "c = filter a by $0 == 'hi';" +
+                    "d = filter a by $0 == 'bye';" +
+                    "store b into '" + outputFileName + "_1';" +
+                    "store c into '" + outputFileName + "_2';" + 
+                    "store d into '" + outputFileName + "_3';";
+            
+            String singleStoreScript =  "a = load '"+ inputFileName + "';" +
+                "b = foreach a generate " + FailUDF.class.getName() + "($0);" + 
+                "store b into '" + outputFileName + "_1';" ;
+            
+            for (ExecType execType : modes) {
+                for(boolean isPropertySet: new boolean[] { true, false}) {
+                    for(boolean isMultiStore: new boolean[] { true, false}) {
+                        String script = (isMultiStore ? multiStoreScript : 
+                            singleStoreScript);
+                        // since we will be switching between map red and local modes
+                        // we will need to make sure filelocalizer is reset before each
+                        // run.
+                        FileLocalizer.setInitialized(false);
+                        if(execType == ExecType.MAPREDUCE) {
+                            // since the job is guaranteed to fail, let's set 
+                            // number of retries to 1.
+                            Properties props = cluster.getProperties();
+                            props.setProperty(MAP_MAX_ATTEMPTS, "1");
+                            ps = new PigServer(ExecType.MAPREDUCE, props);
+                        } else {
+                            Properties props = new Properties();                                          
+                            props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+                            // since the job is guaranteed to fail, let's set 
+                            // number of retries to 1.
+                            props.setProperty(MAP_MAX_ATTEMPTS, "1");
+                            ps = new PigServer(ExecType.LOCAL, props);
+                        }
+                        ps.getPigContext().getProperties().setProperty(
+                                MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, 
+                                Boolean.toString(isPropertySet));
+                        cleanupFiles(ps, files);
+                        ps.setBatchOn();
+                        Util.createInputFile(ps.getPigContext(), 
+                                inputFileName, inputData);
+                        Util.registerMultiLineQuery(ps, script);
+                        try {
+                            ps.executeBatch();
+                        } catch(IOException ioe) {
+                            if(!ioe.getMessage().equals("FailUDFException")) {
+                                // an unexpected exception
+                                throw ioe;
+                            }
+                        }
+                        for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+                            String sucFile = outputFileName + "_" + i + "/" + 
+                                               MapReduceLauncher.SUCCEEDED_FILE_NAME;
+                            assertEquals("Checking if _SUCCESS file exists in " + 
+                                    execType + " mode", false, 
+                                    Util.exists(ps.getPigContext(), sucFile));
+                        }
+                    }
+                }
+            }
+        } finally {
+            cleanupFiles(ps, files);
+        }
+    }
+
+    // A UDF which always throws an Exception so that the job can fail
+    public static class FailUDF extends EvalFunc<String> {
+
+        @Override
+        public String exec(Tuple input) throws IOException {
+            throw new IOException("FailUDFException");
+        }
+        
+    }
+    private void cleanupFiles(PigServer ps, String... files) throws IOException {
+        for(String file:files) {
+            Util.deleteFile(ps.getPigContext(), file);
+        }    
+    }
+    
+    
     public static class DummyStore extends PigStorage implements StoreMetadata{
 
         private boolean failInPutNext = false;
@@ -618,7 +787,6 @@ public class TestStore extends junit.fra
         @Override
         public void storeStatistics(ResourceStatistics stats, String location,
                 Job job) throws IOException {
-            // TODO Auto-generated method stub
             
         }
     }