You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/04/12 23:21:37 UTC

svn commit: r1325526 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java test/org/apache/pig/test/TestStore.java

Author: daijy
Date: Thu Apr 12 21:21:36 2012
New Revision: 1325526

URL: http://svn.apache.org/viewvc?rev=1325526&view=rev
Log:
PIG-2642: StoreMetadata.storeSchema can't access files in the output directory (Hadoop 0.23)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
    pig/trunk/test/org/apache/pig/test/TestStore.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1325526&r1=1325525&r2=1325526&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Apr 12 21:21:36 2012
@@ -467,6 +467,8 @@ Release 0.9.3 - Unreleased
 
 BUG FIXES
 
+PIG-2642: StoreMetadata.storeSchema can't access files in the output directory (Hadoop 0.23) (thw via daijy)
+
 PIG-2621: Documentation inaccurate regarding Pig Properties in trunk (prkommireddi via daijy)
 
 PIG-2550: Custom tuple results in "Unexpected datatype 110 while reading tuplefrom binary file" while spilling (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1325526&r1=1325525&r2=1325526&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Thu Apr 12 21:21:36 2012
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -34,9 +33,7 @@ import org.apache.pig.StoreMetadata;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 
 /**
@@ -169,15 +166,15 @@ public class PigOutputCommitter extends 
         }
        
     }
-    
-    // This method only be called in 20.203+
+
+    // This method only be called in 20.203+/0.23
     public void commitJob(JobContext context) throws IOException {
         // call commitJob on all map and reduce committers
         for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
             if (mapCommitter.first!=null) {
                 JobContext updatedContext = setUpContext(context,
                         mapCommitter.second);
-                storeCleanup(mapCommitter.second, updatedContext.getConfiguration());
+                // PIG-2642 promote files before calling storeCleanup/storeSchema 
                 try {
                     // Use reflection, 20.2 does not have such method
                     Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class);
@@ -186,6 +183,7 @@ public class PigOutputCommitter extends 
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
+                storeCleanup(mapCommitter.second, updatedContext.getConfiguration());
             }
         }
         for (Pair<OutputCommitter, POStore> reduceCommitter :
@@ -193,7 +191,7 @@ public class PigOutputCommitter extends 
             if (reduceCommitter.first!=null) {
                 JobContext updatedContext = setUpContext(context,
                         reduceCommitter.second);
-                storeCleanup(reduceCommitter.second, updatedContext.getConfiguration());
+                // PIG-2642 promote files before calling storeCleanup/storeSchema 
                 try {
                     // Use reflection, 20.2 does not have such method
                     Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class);
@@ -202,6 +200,7 @@ public class PigOutputCommitter extends 
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
+                storeCleanup(reduceCommitter.second, updatedContext.getConfiguration());
             }
         }
     }

Modified: pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=1325526&r1=1325525&r2=1325526&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStore.java Thu Apr 12 21:21:36 2012
@@ -29,6 +29,7 @@ import java.util.Random;
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -37,9 +38,9 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreMetadata;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -732,6 +733,17 @@ public class TestStore extends junit.fra
         public void storeSchema(ResourceSchema schema, String location,
                 Job job) throws IOException {
             FileSystem fs = FileSystem.get(job.getConfiguration());
+            // verify that output is available prior to storeSchema call
+            Path resultPath = new Path(location, "part-m-00000");
+            if (!fs.exists(resultPath)) {
+	            FileStatus[] listing = fs.listStatus(new Path(location));
+	            for (FileStatus fstat : listing) {
+	            	System.err.println(fstat.getPath());
+	            }
+	            // not creating the marker file below fails the test
+	            throw new IOException("" + resultPath + " not available in storeSchema");
+            }
+            
             // create a file to test that this method got called - if it gets called
             // multiple times, the create will throw an Exception
             fs.create(