You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/03/02 02:01:52 UTC

svn commit: r917827 [2/2] - in /hadoop/pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/zebra/src/java/org/apache/hadoop/zebra/pig/ contrib/zebra/src/test/org/apache/hadoop/zebra/pig/ src/org/apache/pig/ src/...

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Tue Mar  2 01:01:51 2010
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 
 import junit.framework.Assert;
@@ -30,18 +31,12 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
-import org.apache.pig.StoreFunc;
 import org.apache.pig.StoreMetadata;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -51,6 +46,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
@@ -329,8 +325,8 @@
             Util.registerMultiLineQuery(pig, query);
             pig.executeBatch();
             ResourceSchema rs = new BinStorage().getSchema(outputFileName, 
-                    ConfigurationUtil.toConfiguration(pig.getPigContext().
-                            getProperties()));
+                    new Job(ConfigurationUtil.toConfiguration(pig.getPigContext().
+                            getProperties())));
             Schema expectedSchema = Util.getSchemaFromString(
                     "c:chararray,i:int,d:double");
             Assert.assertTrue("Checking binstorage getSchema output", Schema.equals( 
@@ -399,7 +395,9 @@
                     Util.deleteFile(ps.getPigContext(), outputFileName);
                     Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
                 } else {
-                    ps = new PigServer(ExecType.LOCAL);
+                    Properties props = new Properties();                                          
+                    props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+                    ps = new PigServer(ExecType.LOCAL, props);
                     Util.deleteFile(ps.getPigContext(), inputFileName);
                     Util.deleteFile(ps.getPigContext(), outputFileName);
                     Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
@@ -424,61 +422,195 @@
         }
     }
     
-    public static class DummyStore extends StoreFunc implements StoreMetadata{
-
-        @Override
-        public void checkSchema(ResourceSchema s) throws IOException {
+    @Test
+    public void testCleanupOnFailure() throws Exception {
+        PigServer ps = null;
+        String cleanupSuccessFile = outputFileName + "_cleanupOnFailure_succeeded";
+        String cleanupFailFile = outputFileName + "_cleanupOnFailure_failed";
+        try {
+            ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
+            String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
             
+            String script = "a = load '"+ inputFileName + "';" +
+                    "store a into '" + outputFileName + "' using " + 
+                    DummyStore.class.getName() + "('true');";
+            
+            for (ExecType execType : modes) {
+                if(execType == ExecType.MAPREDUCE) {
+                    ps = new PigServer(ExecType.MAPREDUCE, 
+                            cluster.getProperties());
+                } else {
+                    Properties props = new Properties();                                          
+                    props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+                    ps = new PigServer(ExecType.LOCAL, props);
+                }
+                Util.deleteFile(ps.getPigContext(), inputFileName);
+                Util.deleteFile(ps.getPigContext(), outputFileName);
+                Util.deleteFile(ps.getPigContext(), cleanupFailFile);
+                Util.deleteFile(ps.getPigContext(), cleanupSuccessFile);
+                ps.setBatchOn();
+                Util.createInputFile(ps.getPigContext(), 
+                        inputFileName, inputData);
+                Util.registerMultiLineQuery(ps, script);
+                ps.executeBatch();
+                assertEquals(
+                        "Checking if file indicating that cleanupOnFailure failed " +
+                        " does not exists in " + execType + " mode", false, 
+                        Util.exists(ps.getPigContext(), cleanupFailFile));
+                assertEquals(
+                        "Checking if file indicating that cleanupOnFailure was " +
+                        "successfully called exists in " + execType + " mode", true, 
+                        Util.exists(ps.getPigContext(), cleanupSuccessFile));
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail("Exception encountered - hence failing:" + e);
+        } finally {
+            Util.deleteFile(ps.getPigContext(), inputFileName);
+            Util.deleteFile(ps.getPigContext(), outputFileName);
+            Util.deleteFile(ps.getPigContext(), cleanupFailFile);
+            Util.deleteFile(ps.getPigContext(), cleanupSuccessFile);
         }
-
-        @Override
-        public OutputFormat getOutputFormat() throws IOException {
-            // we don't really write in the test - so this is just to keep
-            // Pig/hadoop happy
-            return new TextOutputFormat<Long, Text>();
-        }
-
-        @Override
-        public void prepareToWrite(RecordWriter writer) throws IOException {
+    }
+    
+    
+    @Test
+    public void testCleanupOnFailureMultiStore() throws Exception {
+        PigServer ps = null;
+        String outputFileName1 = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
+        String outputFileName2 = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
+        String cleanupSuccessFile1 = outputFileName1 + "_cleanupOnFailure_succeeded1";
+        String cleanupFailFile1 = outputFileName1 + "_cleanupOnFailure_failed1";
+        String cleanupSuccessFile2 = outputFileName2 + "_cleanupOnFailure_succeeded2";
+        String cleanupFailFile2 = outputFileName2 + "_cleanupOnFailure_failed2";
+        
+        try {
+            ExecType[] modes = new ExecType[] { /*ExecType.LOCAL, */ExecType.MAPREDUCE};
+            String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
             
+            // though the second store should
+            // not cause a failure, the first one does and the result should be
+            // that both stores are considered to have failed
+            String script = "a = load '"+ inputFileName + "';" +
+                    "store a into '" + outputFileName1 + "' using " + 
+                    DummyStore.class.getName() + "('true', '1');" +
+                    "store a into '" + outputFileName2 + "' using " + 
+                    DummyStore.class.getName() + "('false', '2');"; 
+            
+            for (ExecType execType : modes) {
+                if(execType == ExecType.MAPREDUCE) {
+                    ps = new PigServer(ExecType.MAPREDUCE, 
+                            cluster.getProperties());
+                } else {
+                    Properties props = new Properties();                                          
+                    props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+                    ps = new PigServer(ExecType.LOCAL, props);
+                }
+                Util.deleteFile(ps.getPigContext(), inputFileName);
+                Util.deleteFile(ps.getPigContext(), outputFileName1);
+                Util.deleteFile(ps.getPigContext(), outputFileName2);
+                Util.deleteFile(ps.getPigContext(), cleanupFailFile1);
+                Util.deleteFile(ps.getPigContext(), cleanupSuccessFile1);
+                Util.deleteFile(ps.getPigContext(), cleanupFailFile2);
+                Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2);
+                ps.setBatchOn();
+                Util.createInputFile(ps.getPigContext(), 
+                        inputFileName, inputData);
+                Util.registerMultiLineQuery(ps, script);
+                ps.executeBatch();
+                assertEquals(
+                        "Checking if file indicating that cleanupOnFailure failed " +
+                        " does not exists in " + execType + " mode", false, 
+                        Util.exists(ps.getPigContext(), cleanupFailFile1));
+                assertEquals(
+                        "Checking if file indicating that cleanupOnFailure failed " +
+                        " does not exists in " + execType + " mode", false, 
+                        Util.exists(ps.getPigContext(), cleanupFailFile2));
+                assertEquals(
+                        "Checking if file indicating that cleanupOnFailure was " +
+                        "successfully called exists in " + execType + " mode", true, 
+                        Util.exists(ps.getPigContext(), cleanupSuccessFile1));
+                assertEquals(
+                        "Checking if file indicating that cleanupOnFailure was " +
+                        "successfully called exists in " + execType + " mode", true, 
+                        Util.exists(ps.getPigContext(), cleanupSuccessFile2));
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail("Exception encountered - hence failing:" + e);
+        } finally {
+            Util.deleteFile(ps.getPigContext(), inputFileName);
+            Util.deleteFile(ps.getPigContext(), outputFileName1);
+            Util.deleteFile(ps.getPigContext(), outputFileName2);
+            Util.deleteFile(ps.getPigContext(), cleanupFailFile1);
+            Util.deleteFile(ps.getPigContext(), cleanupSuccessFile1);
+            Util.deleteFile(ps.getPigContext(), cleanupFailFile2);
+            Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2);
         }
+    }
+    public static class DummyStore extends PigStorage implements StoreMetadata{
 
-        @Override
-        public void putNext(Tuple t) throws IOException {
-            // we don't really write anything out            
-        }
+        private boolean failInPutNext = false;
+        
+        private String outputFileSuffix= "";
 
-        @Override
-        public String relToAbsPathForStoreLocation(String location, Path curDir)
-                throws IOException {
-            return location;
+        public DummyStore(String failInPutNextStr) {
+            failInPutNext = Boolean.parseBoolean(failInPutNextStr);
         }
-
-        @Override
-        public void setStoreFuncUDFContextSignature(String signature) {
-            
+        
+        public DummyStore(String failInPutNextStr, String outputFileSuffix) {
+            failInPutNext = Boolean.parseBoolean(failInPutNextStr);
+            this.outputFileSuffix = outputFileSuffix;
         }
+        
+        public DummyStore() {
+        }
+
 
         @Override
-        public void setStoreLocation(String location, Job job)
-                throws IOException {
-            FileOutputFormat.setOutputPath(job, new Path(location));
+        public void putNext(Tuple t) throws IOException {
+            if(failInPutNext) {
+                throw new IOException("Failing in putNext");
+            }
+            super.putNext(t);
         }
 
         @Override
         public void storeSchema(ResourceSchema schema, String location,
-                Configuration conf) throws IOException {
-            FileSystem fs = FileSystem.get(conf);
+                Job job) throws IOException {
+            FileSystem fs = FileSystem.get(job.getConfiguration());
             // create a file to test that this method got called - if it gets called
             // multiple times, the create will throw an Exception
             fs.create(
-                    new Path(conf.get("mapred.output.dir") + "_storeSchema_test"),
+                    new Path(location + "_storeSchema_test"),
                     false);
         }
 
         @Override
+        public void cleanupOnFailure(String location, Job job)
+                throws IOException {
+            super.cleanupOnFailure(location, job);
+            
+            // check that the output file location is not present
+            Configuration conf = job.getConfiguration();
+            FileSystem fs = FileSystem.get(conf);
+            if(fs.exists(new Path(location))) {
+                // create a file to inidicate that the cleanup did not happen
+                fs.create(new Path(location + "_cleanupOnFailure_failed" + 
+                        outputFileSuffix), false);
+            }
+            // create a file to test that this method got called successfully
+            // if it gets called multiple times, the create will throw an Exception 
+            fs.create(
+                    new Path(location + "_cleanupOnFailure_succeeded" + 
+                            outputFileSuffix), false);
+        }
+
+        @Override
         public void storeStatistics(ResourceStatistics stats, String location,
-                Configuration conf) throws IOException {
+                Job job) throws IOException {
+            // TODO Auto-generated method stub
+            
         }
     }