You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/04/12 23:21:00 UTC
svn commit: r1325522 - in /pig/branches/branch-0.9: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
test/org/apache/pig/test/TestStore.java
Author: daijy
Date: Thu Apr 12 21:20:59 2012
New Revision: 1325522
URL: http://svn.apache.org/viewvc?rev=1325522&view=rev
Log:
PIG-2642: StoreMetadata.storeSchema can't access files in the output directory (Hadoop 0.23)
Modified:
pig/branches/branch-0.9/CHANGES.txt
pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
pig/branches/branch-0.9/test/org/apache/pig/test/TestStore.java
Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1325522&r1=1325521&r2=1325522&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Thu Apr 12 21:20:59 2012
@@ -28,6 +28,8 @@ PIG-2619: HBaseStorage constructs a Scan
BUG FIXES
+PIG-2642: StoreMetadata.storeSchema can't access files in the output directory (Hadoop 0.23) (thw via daijy)
+
PIG-2621: Documentation inaccurate regarding Pig Properties in trunk (prkommireddi via daijy)
PIG-2540: [piggybank] AvroStorage can't read schema on amazon s3 in elastic mapreduce (rjurney via jcoveney)
Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1325522&r1=1325521&r2=1325522&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Thu Apr 12 21:20:59 2012
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -34,9 +33,7 @@ import org.apache.pig.StoreMetadata;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
/**
@@ -169,15 +166,15 @@ public class PigOutputCommitter extends
}
}
-
- // This method only be called in 20.203+
+
+ // This method only be called in 20.203+/0.23
public void commitJob(JobContext context) throws IOException {
// call commitJob on all map and reduce committers
for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
if (mapCommitter.first!=null) {
JobContext updatedContext = setUpContext(context,
mapCommitter.second);
- storeCleanup(mapCommitter.second, updatedContext.getConfiguration());
+ // PIG-2642 promote files before calling storeCleanup/storeSchema
try {
// Use reflection, 20.2 does not have such method
Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class);
@@ -186,6 +183,7 @@ public class PigOutputCommitter extends
} catch (Exception e) {
throw new IOException(e);
}
+ storeCleanup(mapCommitter.second, updatedContext.getConfiguration());
}
}
for (Pair<OutputCommitter, POStore> reduceCommitter :
@@ -193,7 +191,7 @@ public class PigOutputCommitter extends
if (reduceCommitter.first!=null) {
JobContext updatedContext = setUpContext(context,
reduceCommitter.second);
- storeCleanup(reduceCommitter.second, updatedContext.getConfiguration());
+ // PIG-2642 promote files before calling storeCleanup/storeSchema
try {
// Use reflection, 20.2 does not have such method
Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class);
@@ -202,6 +200,7 @@ public class PigOutputCommitter extends
} catch (Exception e) {
throw new IOException(e);
}
+ storeCleanup(reduceCommitter.second, updatedContext.getConfiguration());
}
}
}
Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestStore.java?rev=1325522&r1=1325521&r2=1325522&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestStore.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestStore.java Thu Apr 12 21:20:59 2012
@@ -29,6 +29,7 @@ import java.util.Random;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -37,9 +38,9 @@ import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreMetadata;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -729,6 +730,17 @@ public class TestStore extends junit.fra
public void storeSchema(ResourceSchema schema, String location,
Job job) throws IOException {
FileSystem fs = FileSystem.get(job.getConfiguration());
+ // verify that output is available prior to storeSchema call
+ Path resultPath = new Path(location, "part-m-00000");
+ if (!fs.exists(resultPath)) {
+ FileStatus[] listing = fs.listStatus(new Path(location));
+ for (FileStatus fstat : listing) {
+ System.err.println(fstat.getPath());
+ }
+ // not creating the marker file below fails the test
+ throw new IOException("" + resultPath + " not available in storeSchema");
+ }
+
// create a file to test that this method got called - if it gets called
// multiple times, the create will throw an Exception
fs.create(