You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2012/09/20 06:05:39 UTC
svn commit: r1387852 - in /pig/branches/branch-0.10: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
test/org/apache/pig/test/TestStore.java test/org/apache/pig/test/Util.java
Author: gates
Date: Thu Sep 20 04:05:38 2012
New Revision: 1387852
URL: http://svn.apache.org/viewvc?rev=1387852&view=rev
Log:
PIG-2712 Pig does not call OutputCommitter.abortJob() on the underlying OutputFormat
Modified:
pig/branches/branch-0.10/CHANGES.txt
pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
pig/branches/branch-0.10/test/org/apache/pig/test/TestStore.java
pig/branches/branch-0.10/test/org/apache/pig/test/Util.java
Modified: pig/branches/branch-0.10/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/CHANGES.txt?rev=1387852&r1=1387851&r2=1387852&view=diff
==============================================================================
--- pig/branches/branch-0.10/CHANGES.txt (original)
+++ pig/branches/branch-0.10/CHANGES.txt Thu Sep 20 04:05:38 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2712: Pig does not call OutputCommitter.abortJob() on the underlying OutputFormat (rohini via gates)
+
PIG-2727: PigStorage Source tagging does not need pig.splitCombination to be turned off (prkommireddi via dvryaboy)
BUG FIXES
Modified: pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1387852&r1=1387851&r2=1387852&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Thu Sep 20 04:05:38 2012
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.ResourceSchema;
@@ -143,7 +144,7 @@ public class PigOutputCommitter extends
}
}
}
-
+
@Override
public void cleanupJob(JobContext context) throws IOException {
// call clean up on all map and reduce committers
@@ -166,7 +167,7 @@ public class PigOutputCommitter extends
}
}
-
+
// This method only be called in 20.203+/0.23
public void commitJob(JobContext context) throws IOException {
// call commitJob on all map and reduce committers
@@ -204,6 +205,42 @@ public class PigOutputCommitter extends
}
}
}
+
+ // This method only be called in 20.203+/0.23
+ public void abortJob(JobContext context, State state) throws IOException {
+ // call abortJob on all map and reduce committers
+ for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
+ if (mapCommitter.first!=null) {
+ JobContext updatedContext = setUpContext(context,
+ mapCommitter.second);
+ try {
+ // Use reflection, 20.2 does not have such method
+ Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
+ m.setAccessible(true);
+ m.invoke(mapCommitter.first, updatedContext, state);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ storeCleanup(mapCommitter.second, updatedContext.getConfiguration());
+ }
+ }
+ for (Pair<OutputCommitter, POStore> reduceCommitter :
+ reduceOutputCommitters) {
+ if (reduceCommitter.first!=null) {
+ JobContext updatedContext = setUpContext(context,
+ reduceCommitter.second);
+ try {
+ // Use reflection, 20.2 does not have such method
+ Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
+ m.setAccessible(true);
+ m.invoke(reduceCommitter.first, updatedContext, state);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ storeCleanup(reduceCommitter.second, updatedContext.getConfiguration());
+ }
+ }
+ }
@Override
Modified: pig/branches/branch-0.10/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/TestStore.java?rev=1387852&r1=1387851&r2=1387852&view=diff
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/test/TestStore.java (original)
+++ pig/branches/branch-0.10/test/org/apache/pig/test/TestStore.java Thu Sep 20 04:05:38 2012
@@ -23,16 +23,24 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
@@ -45,6 +53,7 @@ import org.apache.pig.backend.datastorag
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -95,29 +104,29 @@ public class TestStore extends junit.fra
private static final String FAIL_UDF_NAME
= "org.apache.pig.test.TestStore\\$FailUDF";
private static final String MAP_MAX_ATTEMPTS = "mapred.map.max.attempts";
-
+ private static final String TESTDIR = "/tmp/" + TestStore.class.getSimpleName();
+
@Override
@Before
public void setUp() throws Exception {
pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pc = pig.getPigContext();
- inputFileName = "/tmp/TestStore-" + new Random().nextLong() + ".txt";
- outputFileName = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
+ inputFileName = TESTDIR + "/TestStore-" + new Random().nextLong() + ".txt";
+ outputFileName = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
}
@Override
@After
public void tearDown() throws Exception {
- Util.deleteFile(cluster, inputFileName);
- Util.deleteFile(cluster, outputFileName);
- new File(outputFileName).delete();
+ Util.deleteDirectory(new File(TESTDIR));
+ Util.deleteFile(cluster, TESTDIR);
}
private void storeAndCopyLocally(DataBag inpDB) throws Exception {
setUpInputFileOnCluster(inpDB);
String script = "a = load '" + inputFileName + "'; " +
"store a into '" + outputFileName + "' using PigStorage(':');" +
- "fs -ls /tmp";
+ "fs -ls " + TESTDIR;
pig.setBatchOn();
Util.registerMultiLineQuery(pig, script);
pig.executeBatch();
@@ -368,7 +377,15 @@ public class TestStore extends junit.fra
@Test
public void testSetStoreSchema() throws Exception {
PigServer ps = null;
- String storeSchemaOutputFile = outputFileName + "_storeSchema_test";
+ Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
+ filesToVerify.put(outputFileName + "_storeSchema_test", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED, Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED, Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED, Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED, Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED, Boolean.FALSE);
try {
ExecType[] modes = new ExecType[] { ExecType.MAPREDUCE, ExecType.LOCAL};
String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
@@ -378,37 +395,38 @@ public class TestStore extends junit.fra
DUMMY_STORE_CLASS_NAME + "();";
for (ExecType execType : modes) {
+ FileLocalizer.setInitialized(false);
if(execType == ExecType.MAPREDUCE) {
ps = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
- Util.deleteFile(ps.getPigContext(), inputFileName);
- Util.deleteFile(ps.getPigContext(), outputFileName);
- Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
} else {
Properties props = new Properties();
props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
ps = new PigServer(ExecType.LOCAL, props);
- Util.deleteFile(ps.getPigContext(), inputFileName);
- Util.deleteFile(ps.getPigContext(), outputFileName);
- Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
+ if (Util.isHadoop1_0()) {
+ // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce
+ // OutputCommitter) is fixed only in 0.23.1
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.FALSE);
+ }
}
ps.setBatchOn();
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
Util.createInputFile(ps.getPigContext(),
inputFileName, inputData);
Util.registerMultiLineQuery(ps, script);
ps.executeBatch();
- assertEquals(
- "Checking if file indicating that storeSchema was " +
- "called exists in " + execType + " mode", true,
- Util.exists(ps.getPigContext(), storeSchemaOutputFile));
+ for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
+ String condition = entry.getValue() ? "" : "not";
+ assertEquals("Checking if file " + entry.getKey() +
+ " does " + condition + " exists in " + execType +
+ " mode", (boolean) entry.getValue(),
+ Util.exists(ps.getPigContext(), entry.getKey()));
+ }
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Exception encountered - hence failing:" + e);
- } finally {
- Util.deleteFile(ps.getPigContext(), inputFileName);
- Util.deleteFile(ps.getPigContext(), outputFileName);
- Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
}
}
@@ -434,10 +452,7 @@ public class TestStore extends junit.fra
props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
ps = new PigServer(ExecType.LOCAL, props);
}
- Util.deleteFile(ps.getPigContext(), inputFileName);
- Util.deleteFile(ps.getPigContext(), outputFileName);
- Util.deleteFile(ps.getPigContext(), cleanupFailFile);
- Util.deleteFile(ps.getPigContext(), cleanupSuccessFile);
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
ps.setBatchOn();
Util.createInputFile(ps.getPigContext(),
inputFileName, inputData);
@@ -455,11 +470,6 @@ public class TestStore extends junit.fra
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Exception encountered - hence failing:" + e);
- } finally {
- Util.deleteFile(ps.getPigContext(), inputFileName);
- Util.deleteFile(ps.getPigContext(), outputFileName);
- Util.deleteFile(ps.getPigContext(), cleanupFailFile);
- Util.deleteFile(ps.getPigContext(), cleanupSuccessFile);
}
}
@@ -467,15 +477,31 @@ public class TestStore extends junit.fra
@Test
public void testCleanupOnFailureMultiStore() throws Exception {
PigServer ps = null;
- String outputFileName1 = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
- String outputFileName2 = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
- String cleanupSuccessFile1 = outputFileName1 + "_cleanupOnFailure_succeeded1";
- String cleanupFailFile1 = outputFileName1 + "_cleanupOnFailure_failed1";
- String cleanupSuccessFile2 = outputFileName2 + "_cleanupOnFailure_succeeded2";
- String cleanupFailFile2 = outputFileName2 + "_cleanupOnFailure_failed2";
+ String outputFileName1 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
+ String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
+
+ Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
+ filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE);
+ filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE);
+ filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE);
+ filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE);
try {
- ExecType[] modes = new ExecType[] { /*ExecType.LOCAL, */ExecType.MAPREDUCE};
+ ExecType[] modes = new ExecType[] { ExecType.MAPREDUCE, ExecType.LOCAL};
String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
// though the second store should
@@ -488,6 +514,7 @@ public class TestStore extends junit.fra
DUMMY_STORE_CLASS_NAME + "('false', '2');";
for (ExecType execType : modes) {
+ FileLocalizer.setInitialized(false);
if(execType == ExecType.MAPREDUCE) {
ps = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
@@ -495,50 +522,38 @@ public class TestStore extends junit.fra
Properties props = new Properties();
props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
ps = new PigServer(ExecType.LOCAL, props);
+ // LocalJobRunner does not call abortTask
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE);
+ if (Util.isHadoop1_0()) {
+ // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce
+ // OutputCommitter) is fixed only in 0.23.1
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE);
+ }
}
- Util.deleteFile(ps.getPigContext(), inputFileName);
- Util.deleteFile(ps.getPigContext(), outputFileName1);
- Util.deleteFile(ps.getPigContext(), outputFileName2);
- Util.deleteFile(ps.getPigContext(), cleanupFailFile1);
- Util.deleteFile(ps.getPigContext(), cleanupSuccessFile1);
- Util.deleteFile(ps.getPigContext(), cleanupFailFile2);
- Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2);
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
ps.setBatchOn();
Util.createInputFile(ps.getPigContext(),
inputFileName, inputData);
Util.registerMultiLineQuery(ps, script);
ps.executeBatch();
- assertEquals(
- "Checking if file indicating that cleanupOnFailure failed " +
- " does not exists in " + execType + " mode", false,
- Util.exists(ps.getPigContext(), cleanupFailFile1));
- assertEquals(
- "Checking if file indicating that cleanupOnFailure failed " +
- " does not exists in " + execType + " mode", false,
- Util.exists(ps.getPigContext(), cleanupFailFile2));
- assertEquals(
- "Checking if file indicating that cleanupOnFailure was " +
- "successfully called exists in " + execType + " mode", true,
- Util.exists(ps.getPigContext(), cleanupSuccessFile1));
- assertEquals(
- "Checking if file indicating that cleanupOnFailure was " +
- "successfully called exists in " + execType + " mode", true,
- Util.exists(ps.getPigContext(), cleanupSuccessFile2));
+ for (Entry<String, Boolean> entry : filesToVerify.entrySet()) {
+ String condition = entry.getValue() ? "" : "not";
+ assertEquals("Checking if file " + entry.getKey() +
+ " does " + condition + " exists in " + execType +
+ " mode", (boolean) entry.getValue(),
+ Util.exists(ps.getPigContext(), entry.getKey()));
+ }
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Exception encountered - hence failing:" + e);
- } finally {
- Util.deleteFile(ps.getPigContext(), inputFileName);
- Util.deleteFile(ps.getPigContext(), outputFileName1);
- Util.deleteFile(ps.getPigContext(), outputFileName2);
- Util.deleteFile(ps.getPigContext(), cleanupFailFile1);
- Util.deleteFile(ps.getPigContext(), cleanupSuccessFile1);
- Util.deleteFile(ps.getPigContext(), cleanupFailFile2);
- Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2);
}
}
-
+
// Test that "_SUCCESS" file is created when "mapreduce.fileoutputcommitter.marksuccessfuljobs"
// property is set to true
// The test covers multi store and single store case in local and mapreduce mode
@@ -547,8 +562,7 @@ public class TestStore extends junit.fra
@Test
public void testSuccessFileCreation1() throws Exception {
PigServer ps = null;
- String[] files = new String[] { inputFileName,
- outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"};
+
try {
ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"};
@@ -584,7 +598,7 @@ public class TestStore extends junit.fra
ps.getPigContext().getProperties().setProperty(
MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
Boolean.toString(isPropertySet));
- cleanupFiles(ps, files);
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
ps.setBatchOn();
Util.createInputFile(ps.getPigContext(),
inputFileName, inputData);
@@ -601,7 +615,7 @@ public class TestStore extends junit.fra
}
}
} finally {
- cleanupFiles(ps, files);
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
}
}
@@ -613,8 +627,6 @@ public class TestStore extends junit.fra
@Test
public void testSuccessFileCreation2() throws Exception {
PigServer ps = null;
- String[] files = new String[] { inputFileName,
- outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"};
try {
ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"};
@@ -658,7 +670,7 @@ public class TestStore extends junit.fra
ps.getPigContext().getProperties().setProperty(
MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
Boolean.toString(isPropertySet));
- cleanupFiles(ps, files);
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
ps.setBatchOn();
Util.createInputFile(ps.getPigContext(),
inputFileName, inputData);
@@ -682,7 +694,7 @@ public class TestStore extends junit.fra
}
}
} finally {
- cleanupFiles(ps, files);
+ Util.deleteFile(ps.getPigContext(), TESTDIR);
}
}
@@ -695,13 +707,7 @@ public class TestStore extends junit.fra
}
}
- private void cleanupFiles(PigServer ps, String... files) throws IOException {
- for(String file:files) {
- Util.deleteFile(ps.getPigContext(), file);
- }
- }
-
-
+
public static class DummyStore extends PigStorage implements StoreMetadata{
private boolean failInPutNext = false;
@@ -729,6 +735,12 @@ public class TestStore extends junit.fra
super.putNext(t);
}
+ @SuppressWarnings("rawtypes")
+ @Override
+ public OutputFormat getOutputFormat() {
+ return new DummyOutputFormat(outputFileSuffix);
+ }
+
@Override
public void storeSchema(ResourceSchema schema, String location,
Job job) throws IOException {
@@ -809,4 +821,113 @@ public class TestStore extends junit.fra
Assert.assertEquals(expected, p);
}
}
+
+ static class DummyOutputFormat extends PigTextOutputFormat {
+
+ private String outputFileSuffix;
+
+ public DummyOutputFormat(String outputFileSuffix) {
+ super((byte) '\t');
+ this.outputFileSuffix = outputFileSuffix;
+ }
+
+ @Override
+ public synchronized OutputCommitter getOutputCommitter(
+ TaskAttemptContext context) throws IOException {
+ return new DummyOutputCommitter(outputFileSuffix,
+ super.getOutputCommitter(context));
+ }
+
+ @Override
+ public Path getDefaultWorkFile(TaskAttemptContext context,
+ String extension) throws IOException {
+ FileOutputCommitter committer =
+ (FileOutputCommitter) super.getOutputCommitter(context);
+ return new Path(committer.getWorkPath(), getUniqueFile(context,
+ "part", extension));
+ }
+
+ }
+
+ static class DummyOutputCommitter extends OutputCommitter {
+
+ static String FILE_SETUPJOB_CALLED = "/tmp/TestStore/_setupJob_called";
+ static String FILE_SETUPTASK_CALLED = "/tmp/TestStore/_setupTask_called";
+ static String FILE_COMMITTASK_CALLED = "/tmp/TestStore/_commitTask_called";
+ static String FILE_ABORTTASK_CALLED = "/tmp/TestStore/_abortTask_called";
+ static String FILE_CLEANUPJOB_CALLED = "/tmp/TestStore/_cleanupJob_called";
+ static String FILE_COMMITJOB_CALLED = "/tmp/TestStore/_commitJob_called";
+ static String FILE_ABORTJOB_CALLED = "/tmp/TestStore/_abortJob_called";
+
+ private String outputFileSuffix;
+ private OutputCommitter baseCommitter;
+
+ public DummyOutputCommitter(String outputFileSuffix,
+ OutputCommitter baseCommitter) throws IOException {
+ this.outputFileSuffix = outputFileSuffix;
+ this.baseCommitter = baseCommitter;
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ baseCommitter.setupJob(jobContext);
+ createFile(jobContext, FILE_SETUPJOB_CALLED + outputFileSuffix);
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext)
+ throws IOException {
+ baseCommitter.setupTask(taskContext);
+ createFile(taskContext, FILE_SETUPTASK_CALLED + outputFileSuffix);
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ throws IOException {
+ return true;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext)
+ throws IOException {
+ baseCommitter.commitTask(taskContext);
+ createFile(taskContext, FILE_COMMITTASK_CALLED + outputFileSuffix);
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext)
+ throws IOException {
+ baseCommitter.abortTask(taskContext);
+ createFile(taskContext, FILE_ABORTTASK_CALLED + outputFileSuffix);
+ }
+
+ @Override
+ public void cleanupJob(JobContext jobContext) throws IOException {
+ baseCommitter.cleanupJob(jobContext);
+ createFile(jobContext, FILE_CLEANUPJOB_CALLED + outputFileSuffix);
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ baseCommitter.commitJob(jobContext);
+ createFile(jobContext, FILE_COMMITJOB_CALLED + outputFileSuffix);
+ }
+
+ @Override
+ public void abortJob(JobContext jobContext, State state)
+ throws IOException {
+ baseCommitter.abortJob(jobContext, state);
+ createFile(jobContext, FILE_ABORTJOB_CALLED + outputFileSuffix);
+ }
+
+ public void createFile(JobContext jobContext, String fileName)
+ throws IOException {
+ Configuration conf = jobContext.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ fs.mkdirs(new Path(fileName).getParent());
+ FSDataOutputStream out = fs.create(new Path(fileName), true);
+ out.close();
+ }
+
+ }
}
Modified: pig/branches/branch-0.10/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/Util.java?rev=1387852&r1=1387851&r2=1387852&view=diff
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/branch-0.10/test/org/apache/pig/test/Util.java Thu Sep 20 04:05:38 2012
@@ -588,6 +588,10 @@ public class Util {
}
static public void copyFromClusterToLocal(MiniCluster cluster, String fileNameOnCluster, String localFileName) throws IOException {
+ File parent = new File(localFileName).getParentFile();
+ if (!parent.exists()) {
+ parent.mkdirs();
+ }
PrintWriter writer = new PrintWriter(new FileWriter(localFileName));
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(