You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/03/02 02:01:52 UTC
svn commit: r917827 [2/2] - in /hadoop/pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/zebra/src/java/org/apache/hadoop/zebra/pig/
contrib/zebra/src/test/org/apache/hadoop/zebra/pig/ src/org/apache/pig/
src/...
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=917827&r1=917826&r2=917827&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Tue Mar 2 01:01:51 2010
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.Random;
import junit.framework.Assert;
@@ -30,18 +31,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
-import org.apache.pig.StoreFunc;
import org.apache.pig.StoreMetadata;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.datastorage.DataStorage;
@@ -51,6 +46,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
@@ -329,8 +325,8 @@
Util.registerMultiLineQuery(pig, query);
pig.executeBatch();
ResourceSchema rs = new BinStorage().getSchema(outputFileName,
- ConfigurationUtil.toConfiguration(pig.getPigContext().
- getProperties()));
+ new Job(ConfigurationUtil.toConfiguration(pig.getPigContext().
+ getProperties())));
Schema expectedSchema = Util.getSchemaFromString(
"c:chararray,i:int,d:double");
Assert.assertTrue("Checking binstorage getSchema output", Schema.equals(
@@ -399,7 +395,9 @@
Util.deleteFile(ps.getPigContext(), outputFileName);
Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
} else {
- ps = new PigServer(ExecType.LOCAL);
+ Properties props = new Properties();
+ props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+ ps = new PigServer(ExecType.LOCAL, props);
Util.deleteFile(ps.getPigContext(), inputFileName);
Util.deleteFile(ps.getPigContext(), outputFileName);
Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
@@ -424,61 +422,195 @@
}
}
- public static class DummyStore extends StoreFunc implements StoreMetadata{
-
- @Override
- public void checkSchema(ResourceSchema s) throws IOException {
+ @Test
+ public void testCleanupOnFailure() throws Exception {
+ PigServer ps = null;
+ String cleanupSuccessFile = outputFileName + "_cleanupOnFailure_succeeded";
+ String cleanupFailFile = outputFileName + "_cleanupOnFailure_failed";
+ try {
+ ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE};
+ String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
+ String script = "a = load '"+ inputFileName + "';" +
+ "store a into '" + outputFileName + "' using " +
+ DummyStore.class.getName() + "('true');";
+
+ for (ExecType execType : modes) {
+ if(execType == ExecType.MAPREDUCE) {
+ ps = new PigServer(ExecType.MAPREDUCE,
+ cluster.getProperties());
+ } else {
+ Properties props = new Properties();
+ props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+ ps = new PigServer(ExecType.LOCAL, props);
+ }
+ Util.deleteFile(ps.getPigContext(), inputFileName);
+ Util.deleteFile(ps.getPigContext(), outputFileName);
+ Util.deleteFile(ps.getPigContext(), cleanupFailFile);
+ Util.deleteFile(ps.getPigContext(), cleanupSuccessFile);
+ ps.setBatchOn();
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ ps.executeBatch();
+ assertEquals(
+ "Checking if file indicating that cleanupOnFailure failed " +
+ " does not exists in " + execType + " mode", false,
+ Util.exists(ps.getPigContext(), cleanupFailFile));
+ assertEquals(
+ "Checking if file indicating that cleanupOnFailure was " +
+ "successfully called exists in " + execType + " mode", true,
+ Util.exists(ps.getPigContext(), cleanupSuccessFile));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Exception encountered - hence failing:" + e);
+ } finally {
+ Util.deleteFile(ps.getPigContext(), inputFileName);
+ Util.deleteFile(ps.getPigContext(), outputFileName);
+ Util.deleteFile(ps.getPigContext(), cleanupFailFile);
+ Util.deleteFile(ps.getPigContext(), cleanupSuccessFile);
}
-
- @Override
- public OutputFormat getOutputFormat() throws IOException {
- // we don't really write in the test - so this is just to keep
- // Pig/hadoop happy
- return new TextOutputFormat<Long, Text>();
- }
-
- @Override
- public void prepareToWrite(RecordWriter writer) throws IOException {
+ }
+
+
+ @Test
+ public void testCleanupOnFailureMultiStore() throws Exception {
+ PigServer ps = null;
+ String outputFileName1 = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
+ String outputFileName2 = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
+ String cleanupSuccessFile1 = outputFileName1 + "_cleanupOnFailure_succeeded1";
+ String cleanupFailFile1 = outputFileName1 + "_cleanupOnFailure_failed1";
+ String cleanupSuccessFile2 = outputFileName2 + "_cleanupOnFailure_succeeded2";
+ String cleanupFailFile2 = outputFileName2 + "_cleanupOnFailure_failed2";
+
+ try {
+ ExecType[] modes = new ExecType[] { /*ExecType.LOCAL, */ExecType.MAPREDUCE};
+ String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
+ // though the second store should
+ // not cause a failure, the first one does and the result should be
+ // that both stores are considered to have failed
+ String script = "a = load '"+ inputFileName + "';" +
+ "store a into '" + outputFileName1 + "' using " +
+ DummyStore.class.getName() + "('true', '1');" +
+ "store a into '" + outputFileName2 + "' using " +
+ DummyStore.class.getName() + "('false', '2');";
+
+ for (ExecType execType : modes) {
+ if(execType == ExecType.MAPREDUCE) {
+ ps = new PigServer(ExecType.MAPREDUCE,
+ cluster.getProperties());
+ } else {
+ Properties props = new Properties();
+ props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+ ps = new PigServer(ExecType.LOCAL, props);
+ }
+ Util.deleteFile(ps.getPigContext(), inputFileName);
+ Util.deleteFile(ps.getPigContext(), outputFileName1);
+ Util.deleteFile(ps.getPigContext(), outputFileName2);
+ Util.deleteFile(ps.getPigContext(), cleanupFailFile1);
+ Util.deleteFile(ps.getPigContext(), cleanupSuccessFile1);
+ Util.deleteFile(ps.getPigContext(), cleanupFailFile2);
+ Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2);
+ ps.setBatchOn();
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ ps.executeBatch();
+ assertEquals(
+ "Checking if file indicating that cleanupOnFailure failed " +
+ " does not exists in " + execType + " mode", false,
+ Util.exists(ps.getPigContext(), cleanupFailFile1));
+ assertEquals(
+ "Checking if file indicating that cleanupOnFailure failed " +
+ " does not exists in " + execType + " mode", false,
+ Util.exists(ps.getPigContext(), cleanupFailFile2));
+ assertEquals(
+ "Checking if file indicating that cleanupOnFailure was " +
+ "successfully called exists in " + execType + " mode", true,
+ Util.exists(ps.getPigContext(), cleanupSuccessFile1));
+ assertEquals(
+ "Checking if file indicating that cleanupOnFailure was " +
+ "successfully called exists in " + execType + " mode", true,
+ Util.exists(ps.getPigContext(), cleanupSuccessFile2));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Exception encountered - hence failing:" + e);
+ } finally {
+ Util.deleteFile(ps.getPigContext(), inputFileName);
+ Util.deleteFile(ps.getPigContext(), outputFileName1);
+ Util.deleteFile(ps.getPigContext(), outputFileName2);
+ Util.deleteFile(ps.getPigContext(), cleanupFailFile1);
+ Util.deleteFile(ps.getPigContext(), cleanupSuccessFile1);
+ Util.deleteFile(ps.getPigContext(), cleanupFailFile2);
+ Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2);
}
+ }
+ public static class DummyStore extends PigStorage implements StoreMetadata{
- @Override
- public void putNext(Tuple t) throws IOException {
- // we don't really write anything out
- }
+ private boolean failInPutNext = false;
+
+ private String outputFileSuffix= "";
- @Override
- public String relToAbsPathForStoreLocation(String location, Path curDir)
- throws IOException {
- return location;
+ public DummyStore(String failInPutNextStr) {
+ failInPutNext = Boolean.parseBoolean(failInPutNextStr);
}
-
- @Override
- public void setStoreFuncUDFContextSignature(String signature) {
-
+
+ public DummyStore(String failInPutNextStr, String outputFileSuffix) {
+ failInPutNext = Boolean.parseBoolean(failInPutNextStr);
+ this.outputFileSuffix = outputFileSuffix;
}
+
+ public DummyStore() {
+ }
+
@Override
- public void setStoreLocation(String location, Job job)
- throws IOException {
- FileOutputFormat.setOutputPath(job, new Path(location));
+ public void putNext(Tuple t) throws IOException {
+ if(failInPutNext) {
+ throw new IOException("Failing in putNext");
+ }
+ super.putNext(t);
}
@Override
public void storeSchema(ResourceSchema schema, String location,
- Configuration conf) throws IOException {
- FileSystem fs = FileSystem.get(conf);
+ Job job) throws IOException {
+ FileSystem fs = FileSystem.get(job.getConfiguration());
// create a file to test that this method got called - if it gets called
// multiple times, the create will throw an Exception
fs.create(
- new Path(conf.get("mapred.output.dir") + "_storeSchema_test"),
+ new Path(location + "_storeSchema_test"),
false);
}
@Override
+ public void cleanupOnFailure(String location, Job job)
+ throws IOException {
+ super.cleanupOnFailure(location, job);
+
+ // check that the output file location is not present
+ Configuration conf = job.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ if(fs.exists(new Path(location))) {
+ // create a file to inidicate that the cleanup did not happen
+ fs.create(new Path(location + "_cleanupOnFailure_failed" +
+ outputFileSuffix), false);
+ }
+ // create a file to test that this method got called successfully
+ // if it gets called multiple times, the create will throw an Exception
+ fs.create(
+ new Path(location + "_cleanupOnFailure_succeeded" +
+ outputFileSuffix), false);
+ }
+
+ @Override
public void storeStatistics(ResourceStatistics stats, String location,
- Configuration conf) throws IOException {
+ Job job) throws IOException {
+ // TODO Auto-generated method stub
+
}
}