You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/06/04 02:45:25 UTC
svn commit: r951229 - in /hadoop/pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
test/org/apache/pig/test/TestStore.java
Author: pradeepkth
Date: Fri Jun 4 00:45:24 2010
New Revision: 951229
URL: http://svn.apache.org/viewvc?rev=951229&view=rev
Log:
PIG-1433: pig should create success file if mapreduce.fileoutputcommitter.marksuccessfuljobs is true (pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=951229&r1=951228&r2=951229&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jun 4 00:45:24 2010
@@ -70,6 +70,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1433: pig should create success file if
+mapreduce.fileoutputcommitter.marksuccessfuljobs is true (pradeepkth)
+
PIG-1347: Clear up output directory for a failed job (daijy)
PIG-1419: Remove "user.name" from JobConf (daijy)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=951229&r1=951228&r2=951229&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Jun 4 00:45:24 2010
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
@@ -79,6 +80,11 @@ public class MapReduceLauncher extends L
private boolean aggregateWarning = false;
private Map<FileSpec, Exception> failureMap;
+ public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+
+ public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+ "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+
/**
* Get the exception that caused a failure on the backend for a
* store location (if any).
@@ -309,6 +315,9 @@ public class MapReduceLauncher extends L
}
if (!st.isTmpStore()) {
succeededStores.add(st);
+ // create an "_SUCCESS" file in output location if
+ // output location is a filesystem dir
+ createSuccessFile(job, st);
finalStores++;
if (st.isMultiStore()) {
String counterName = PigStatsUtil.getMultiStoreCounterName(st);
@@ -509,6 +518,24 @@ public class MapReduceLauncher extends L
PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration());
}
+ private boolean shouldMarkOutputDir(Job job) {
+ return job.getJobConf().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+ false);
+ }
+
+ private void createSuccessFile(Job job, POStore store) throws IOException {
+ if(shouldMarkOutputDir(job)) {
+ FileSystem fs = FileSystem.get(job.getJobConf());
+ Path outputPath = new Path(store.getSFile().getFileName());
+ if(fs.exists(outputPath)){
+ // create a file in the folder to mark it
+ Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+ if(!fs.exists(filePath)) {
+ fs.create(filePath).close();
+ }
+ }
+ }
+ }
/**
* An exception handler class to handle exceptions thrown by the job controller thread
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=951229&r1=951228&r2=951229&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Fri Jun 4 00:45:24 2010
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
@@ -55,6 +56,7 @@ import org.apache.pig.data.DefaultBagFac
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -89,7 +91,14 @@ public class TestStore extends junit.fra
String inputFileName;
String outputFileName;
-
+
+ private static final String DUMMY_STORE_CLASS_NAME
+ = "org.apache.pig.test.TestStore\\$DummyStore";
+
+ private static final String FAIL_UDF_NAME
+ = "org.apache.pig.test.TestStore\\$FailUDF";
+ private static final String MAP_MAX_ATTEMPTS = "mapred.map.max.attempts";
+
@Override
@Before
public void setUp() throws Exception {
@@ -558,8 +567,168 @@ public class TestStore extends junit.fra
}
}
- private static final String DUMMY_STORE_CLASS_NAME
- = "org.apache.pig.test.TestStore\\$DummyStore";
+ // Test that "_SUCCESS" file is created when "mapreduce.fileoutputcommitter.marksuccessfuljobs"
+ // property is set to true
+ // The test covers multi store and single store case in local and mapreduce mode
+ // The test also checks that "_SUCCESS" file is NOT created when the property
+ // is not set to true in all the modes.
+ @Test
+ public void testSuccessFileCreation1() throws Exception {
+ PigServer ps = null;
+ String[] files = new String[] { inputFileName,
+ outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"};
+ try {
+ ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
+ String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"};
+
+ String multiStoreScript = "a = load '"+ inputFileName + "';" +
+ "b = filter a by $0 == 'hello';" +
+ "c = filter a by $0 == 'hi';" +
+ "d = filter a by $0 == 'bye';" +
+ "store b into '" + outputFileName + "_1';" +
+ "store c into '" + outputFileName + "_2';" +
+ "store d into '" + outputFileName + "_3';";
+
+ String singleStoreScript = "a = load '"+ inputFileName + "';" +
+ "store a into '" + outputFileName + "_1';" ;
+
+ for (ExecType execType : modes) {
+ for(boolean isPropertySet: new boolean[] { true, false}) {
+ for(boolean isMultiStore: new boolean[] { true, false}) {
+ String script = (isMultiStore ? multiStoreScript :
+ singleStoreScript);
+ // since we will be switching between map red and local modes
+ // we will need to make sure filelocalizer is reset before each
+ // run.
+ FileLocalizer.setInitialized(false);
+ if(execType == ExecType.MAPREDUCE) {
+ ps = new PigServer(ExecType.MAPREDUCE,
+ cluster.getProperties());
+ } else {
+ Properties props = new Properties();
+ props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+ ps = new PigServer(ExecType.LOCAL, props);
+ }
+ ps.getPigContext().getProperties().setProperty(
+ MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+ Boolean.toString(isPropertySet));
+ cleanupFiles(ps, files);
+ ps.setBatchOn();
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ ps.executeBatch();
+ for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+ String sucFile = outputFileName + "_" + i + "/" +
+ MapReduceLauncher.SUCCEEDED_FILE_NAME;
+ assertEquals("Checking if _SUCCESS file exists in " +
+ execType + " mode", isPropertySet,
+ Util.exists(ps.getPigContext(), sucFile));
+ }
+ }
+ }
+ }
+ } finally {
+ cleanupFiles(ps, files);
+ }
+ }
+
+ // Test _SUCCESS file is NOT created when job fails and when
+ // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to true
+ // The test covers multi store and single store case in local and mapreduce mode
+ // The test also checks that "_SUCCESS" file is NOT created when the property
+ // is not set to true in all the modes.
+ @Test
+ public void testSuccessFileCreation2() throws Exception {
+ PigServer ps = null;
+ String[] files = new String[] { inputFileName,
+ outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"};
+ try {
+ ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
+ String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"};
+ System.err.println("XXX: " + TestStore.FailUDF.class.getName());
+ String multiStoreScript = "a = load '"+ inputFileName + "';" +
+ "b = filter a by $0 == 'hello';" +
+ "b = foreach b generate " + FAIL_UDF_NAME + "($0);" +
+ "c = filter a by $0 == 'hi';" +
+ "d = filter a by $0 == 'bye';" +
+ "store b into '" + outputFileName + "_1';" +
+ "store c into '" + outputFileName + "_2';" +
+ "store d into '" + outputFileName + "_3';";
+
+ String singleStoreScript = "a = load '"+ inputFileName + "';" +
+ "b = foreach a generate " + FAIL_UDF_NAME + "($0);" +
+ "store b into '" + outputFileName + "_1';" ;
+
+ for (ExecType execType : modes) {
+ for(boolean isPropertySet: new boolean[] { true, false}) {
+ for(boolean isMultiStore: new boolean[] { true, false}) {
+ String script = (isMultiStore ? multiStoreScript :
+ singleStoreScript);
+ // since we will be switching between map red and local modes
+ // we will need to make sure filelocalizer is reset before each
+ // run.
+ FileLocalizer.setInitialized(false);
+ if(execType == ExecType.MAPREDUCE) {
+ // since the job is guaranteed to fail, let's set
+ // number of retries to 1.
+ Properties props = cluster.getProperties();
+ props.setProperty(MAP_MAX_ATTEMPTS, "1");
+ ps = new PigServer(ExecType.MAPREDUCE, props);
+ } else {
+ Properties props = new Properties();
+ props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+ // since the job is guaranteed to fail, let's set
+ // number of retries to 1.
+ props.setProperty(MAP_MAX_ATTEMPTS, "1");
+ ps = new PigServer(ExecType.LOCAL, props);
+ }
+ ps.getPigContext().getProperties().setProperty(
+ MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+ Boolean.toString(isPropertySet));
+ cleanupFiles(ps, files);
+ ps.setBatchOn();
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ try {
+ ps.executeBatch();
+ } catch(IOException ioe) {
+ if(!ioe.getMessage().equals("FailUDFException")) {
+ // an unexpected exception
+ throw ioe;
+ }
+ }
+ for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) {
+ String sucFile = outputFileName + "_" + i + "/" +
+ MapReduceLauncher.SUCCEEDED_FILE_NAME;
+ assertEquals("Checking if _SUCCESS file exists in " +
+ execType + " mode", false,
+ Util.exists(ps.getPigContext(), sucFile));
+ }
+ }
+ }
+ }
+ } finally {
+ cleanupFiles(ps, files);
+ }
+ }
+
+ // A UDF which always throws an Exception so that the job can fail
+ public static class FailUDF extends EvalFunc<String> {
+
+ @Override
+ public String exec(Tuple input) throws IOException {
+ throw new IOException("FailUDFException");
+ }
+
+ }
+ private void cleanupFiles(PigServer ps, String... files) throws IOException {
+ for(String file:files) {
+ Util.deleteFile(ps.getPigContext(), file);
+ }
+ }
+
public static class DummyStore extends PigStorage implements StoreMetadata{
@@ -622,7 +791,6 @@ public class TestStore extends junit.fra
@Override
public void storeStatistics(ResourceStatistics stats, String location,
Job job) throws IOException {
- // TODO Auto-generated method stub
}
}