You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ju...@apache.org on 2012/04/27 02:34:30 UTC

svn commit: r1331158 - in /pig/branches/branch-0.10: ./ src/org/apache/pig/builtin/mock/ test/org/apache/pig/builtin/ test/org/apache/pig/builtin/mock/

Author: julien
Date: Fri Apr 27 00:34:29 2012
New Revision: 1331158

URL: http://svn.apache.org/viewvc?rev=1331158&view=rev
Log:
PIG-2650: Convenience mock Loader and Storer to simplify unit testing of Pig scripts (julien)

Added:
    pig/branches/branch-0.10/src/org/apache/pig/builtin/mock/
    pig/branches/branch-0.10/src/org/apache/pig/builtin/mock/Storage.java
    pig/branches/branch-0.10/test/org/apache/pig/builtin/
    pig/branches/branch-0.10/test/org/apache/pig/builtin/mock/
    pig/branches/branch-0.10/test/org/apache/pig/builtin/mock/TestMockStorage.java
Modified:
    pig/branches/branch-0.10/CHANGES.txt
    pig/branches/branch-0.10/ivy.xml

Modified: pig/branches/branch-0.10/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/CHANGES.txt?rev=1331158&r1=1331157&r2=1331158&view=diff
==============================================================================
--- pig/branches/branch-0.10/CHANGES.txt (original)
+++ pig/branches/branch-0.10/CHANGES.txt Fri Apr 27 00:34:29 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2650: Convenience mock Loader and Storer to simplify unit testing of Pig scripts (julien)
+
 PIG-2541: Automatic record provenance (source tagging) for PigStorage (prkommireddi via daijy)
 
 PIG-2601: Additional document for 0.10 (daijy)

Modified: pig/branches/branch-0.10/ivy.xml
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/ivy.xml?rev=1331158&r1=1331157&r2=1331158&view=diff
==============================================================================
--- pig/branches/branch-0.10/ivy.xml (original)
+++ pig/branches/branch-0.10/ivy.xml Fri Apr 27 00:34:29 2012
@@ -176,7 +176,7 @@
     <dependency org="net.java.dev.javacc" name="javacc" rev="${javacc.version}"
       conf="compile->master"/>
     <dependency org="junit" name="junit" rev="${junit.version}"
-      conf="test->default"/>
+      conf="compile->master"/>
     <dependency org="com.google.code.p.arat" name="rat-lib" rev="${rats-lib.version}"
       conf="releaseaudit->default"/>
     <dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="${jackson.version}"

Added: pig/branches/branch-0.10/src/org/apache/pig/builtin/mock/Storage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/builtin/mock/Storage.java?rev=1331158&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/builtin/mock/Storage.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/builtin/mock/Storage.java Fri Apr 27 00:34:29 2012
@@ -0,0 +1,581 @@
+package org.apache.pig.builtin.mock;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.schema;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+import org.apache.pig.ExecType;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+
+/**
+ * A convenient mock Storage for unit tests
+ *
+ * <pre>
+ *  PigServer pigServer = new PigServer(ExecType.LOCAL);
+ *  Data data = resetData(pigServer);
+ *      data.set("foo",
+ *      tuple("a"),
+ *      tuple("b"),
+ *      tuple("c")
+ *      );
+ *
+ *  pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();");
+ *  pigServer.registerQuery("STORE A INTO 'bar' USING mock.Storage();");
+ *
+ *  List<Tuple> out = data.get("bar");
+ *
+ *  assertEquals(tuple("a"), out.get(0));
+ *  assertEquals(tuple("b"), out.get(1));
+ *  assertEquals(tuple("c"), out.get(2));
+ * </pre>
+ * With Schema:
+ *  <pre>
+ *  PigServer pigServer = new PigServer(ExecType.LOCAL);
+ *  Data data = resetData(pigServer);
+ *
+ *  data.set("foo", "blah:chararray",
+ *      tuple("a"),
+ *      tuple("b"),
+ *      tuple("c")
+ *      );
+ *
+ *  pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();");
+ *  pigServer.registerQuery("B = FOREACH A GENERATE blah as a, blah as b;");
+ *  pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();");
+ *
+ *  assertEquals(schema("a:chararray,b:chararray"), data.getSchema("bar"));
+ *  
+ *  List<Tuple> out = data.get("bar");
+ *  assertEquals(tuple("a", "a"), out.get(0));
+ *  assertEquals(tuple("b", "b"), out.get(1));
+ *  assertEquals(tuple("c", "c"), out.get(2));
+ * </pre>
+ */
+public class Storage extends LoadFunc implements StoreFuncInterface, LoadMetadata, StoreMetadata {
+  private static final String PIG_CONTEXT_KEY = "pig.mock.storage.id";
+  private static final Logger LOG = Logger.getLogger(Storage.class);
+  private static Map<Integer, Data> idToData = new HashMap<Integer, Data>();
+  private static TupleFactory TF = TupleFactory.getInstance();
+  private static BagFactory BF = BagFactory.getInstance();
+
+  private static int nextId;
+
+  /**
+   * @param objects
+   * @return a tuple containing the provided objects
+   */
+  public static Tuple tuple(Object... objects) {
+    return TF.newTuple(Arrays.asList(objects));
+  }
+
+  /**
+   * @param tuples
+   * @return a bag containing the provided objects
+   */
+  public static DataBag bag(Tuple... tuples) {
+    return BF.newDefaultBag(Arrays.asList(tuples));
+  }
+  
+  /**
+   * @param schema
+   * @return the schema represented by the string
+   * @throws ParserException if the schema is invalid
+   */
+  public static Schema schema(String schema) throws ParserException {
+	  return Utils.getSchemaFromString(schema);
+  }
+
+  /**
+   * reset the store and get the Data object to access it
+   * @param pigServer
+   * @return
+   */
+  public static Data resetData(PigServer pigServer) {
+    return resetData(pigServer.getPigContext());
+  }
+
+  /**
+   * reset the store and get the Data object to access it
+   * @param context
+   * @return
+   */
+  public static Data resetData(PigContext context) {
+    Properties properties = context.getProperties();
+
+    // cleaning up previous data
+    try {
+      if (properties.contains(PIG_CONTEXT_KEY)) {
+        Integer previousId = new Integer(properties.getProperty(PIG_CONTEXT_KEY));
+        idToData.remove(previousId);
+      }
+    } catch (RuntimeException e) {
+      LOG.warn("invalid id in context properties for "+PIG_CONTEXT_KEY, e);
+    }
+
+    // setting new Store
+    int id = nextId++;
+    properties.setProperty(PIG_CONTEXT_KEY, String.valueOf(id));
+    Data data = new Data();
+    idToData.put(id, data);
+    return data;
+  }
+
+  private Data getData(Job job) throws IOException {
+    String stringId = job.getConfiguration().get(PIG_CONTEXT_KEY);
+    if (stringId == null) {
+      throw new IOException("no Data prepared for this Script. " +
+      		"You need to call Storage.resetData(pigServer.getPigContext()) first");
+    }
+    Data data = idToData.get(new Integer(stringId));
+    if (data == null) {
+      throw new IOException("no Data anymore for this Script. " +
+          "Has data been reset by another Storage.resetData(pigServer.getPigContext()) ?");
+    }
+    return data;
+  }
+
+  /**
+   * An isolated data store to avoid side effects
+   *
+   */
+  public static class Data implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private Map<String, Collection<Tuple>> locationToData = new HashMap<String, Collection<Tuple>>();
+    private Map<String, Schema> locationToSchema = new HashMap<String, Schema>();
+
+    /**
+     * to set the data in a location with a known schema
+     *
+     * @param location "where" to store the tuples
+     * @param schema the schema of the data
+     * @param data the tuples to store
+     * @throws ParserException if schema is invalid
+     */
+    public void set(String location, String schema, Collection<Tuple> data) throws ParserException {
+      set(location, Utils.getSchemaFromString(schema), data);
+    }
+
+    /**
+     * to set the data in a location with a known schema
+     *
+     * @param location "where" to store the tuples
+     * @param schema
+     * @param data the tuples to store
+     * @throws ParserException if schema is invalid
+     */
+    public void set(String location, String schema, Tuple... data) throws ParserException {
+      set(location, Utils.getSchemaFromString(schema), Arrays.asList(data));
+    }
+    
+    /**
+     * to set the data in a location with a known schema
+     *
+     * @param location "where" to store the tuples
+     * @param schema
+     * @param data the tuples to store
+     */
+    public void set(String location, Schema schema, Collection<Tuple> data) {
+      set(location, data);
+      locationToSchema.put(location, schema);
+    }
+
+    /**
+     * to set the data in a location with a known schema
+     *
+     * @param location "where" to store the tuples
+     * @param schema
+     * @param data the tuples to store
+     */
+    public void set(String location, Schema schema, Tuple... data) {
+      set(location, schema, Arrays.asList(data));
+    }
+
+    /**
+     * to set the data in a location
+     *
+     * @param location "where" to store the tuples
+     * @param data the tuples to store
+     */
+    public void set(String location, Collection<Tuple> data) {
+      locationToData.put(location, data);
+    }
+
+    /**
+     * to set the data in a location
+     *
+     * @param location "where" to store the tuples
+     * @param data the tuples to store
+     */
+    public void set(String location, Tuple... data) {
+      set(location, Arrays.asList(data));
+    }
+    
+    /**
+     *
+     * @param location
+     * @return the data in this location
+     */
+    public List<Tuple> get(String location) {
+      if (!locationToData.containsKey(location)) {
+        throw new RuntimeException("No data for location '" + location + "'");
+      }
+      Collection<Tuple> collection = locationToData.get(location);
+	return collection instanceof List ? (List<Tuple>)collection : new ArrayList<Tuple>(collection);
+    }
+
+    /**
+     * 
+     * @param location
+     * @return the schema stored in this location
+     */
+	public Schema getSchema(String location) {
+		return locationToSchema.get(location);
+	}
+
+	/**
+	 * to set the schema for a given location
+	 * @param location
+	 * @param schema
+	 */
+	public void setSchema(String location, Schema schema) {
+		locationToSchema.put(location, schema);
+	}
+
+  }
+
+  private String location;
+
+  private Data data;
+  
+  private Schema schema;
+
+  private List<Tuple> dataBeingWritten;
+
+  private Iterator<Tuple> dataBeingRead;
+
+  private void init(String location, Job job) throws IOException {
+	  this.data = getData(job);
+	  this.location = location;
+	  this.schema = data.getSchema(location);
+  }
+  // LoadFunc
+
+  @Override
+  public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+	this.location = location;
+    return location;
+  }
+
+  @Override
+  public void setLocation(String location, Job job) throws IOException {
+    init(location, job);
+    this.dataBeingRead = data.get(location).iterator();
+  }
+
+  @Override
+  public InputFormat<?, ?> getInputFormat() throws IOException {
+    return new MockInputFormat(location);
+  }
+
+  @Override
+  public LoadCaster getLoadCaster() throws IOException {
+    return super.getLoadCaster();
+  }
+
+  @Override
+  public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) throws IOException {
+  }
+
+  @Override
+  public Tuple getNext() throws IOException {
+    if (dataBeingRead == null) {
+      throw new IOException("data was not correctly initialized in MockLoader");
+    }
+    return dataBeingRead.hasNext() ? dataBeingRead.next() : null;
+  }
+
+  @Override
+  public void setUDFContextSignature(String signature) {
+    super.setUDFContextSignature(signature);
+  }
+  
+  // LoadMetaData
+  
+  @Override
+  public ResourceSchema getSchema(String location, Job job) throws IOException {
+	init(location, job);
+  	return schema == null ? null : new ResourceSchema(schema);
+  }
+
+  @Override
+  public ResourceStatistics getStatistics(String location, Job job)
+  		throws IOException {
+	init(location, job);
+  	return null;
+  }
+
+  @Override
+  public String[] getPartitionKeys(String location, Job job) throws IOException {
+	init(location, job);
+  	return null;
+  }
+
+  @Override
+  public void setPartitionFilter(Expression partitionFilter) throws IOException {
+  }
+
+  // StoreFunc
+
+  @Override
+  public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+    this.location = location;
+    return location;
+  }
+
+  @Override
+  public OutputFormat<?, ?> getOutputFormat() throws IOException {
+    return new MockOutputFormat();
+  }
+
+  @Override
+  public void setStoreLocation(String location, Job job) throws IOException {
+	init(location, job);
+  }
+
+  @Override
+  public void checkSchema(ResourceSchema s) throws IOException {
+  }
+
+  @Override
+  public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException {
+    this.dataBeingWritten = new ArrayList<Tuple>();
+    this.data.set(location, dataBeingWritten);
+  }
+
+  @Override
+  public void putNext(Tuple t) throws IOException {
+    this.dataBeingWritten.add(t);
+  }
+
+  @Override
+  public void setStoreFuncUDFContextSignature(String signature) {
+  }
+
+  @Override
+  public void cleanupOnFailure(String location, Job job) throws IOException {
+	init(location, job);
+  }
+
+  // StoreMetaData
+  
+  @Override
+  public void storeStatistics(ResourceStatistics stats, String location, Job job)
+  		throws IOException {
+	init(location, job);
+  }
+
+  @Override
+  public void storeSchema(ResourceSchema schema, String location, Job job)
+  		throws IOException {
+	init(location, job);
+	data.setSchema(location, Schema.getPigSchema(schema));
+  }
+  
+  // Mocks for LoadFunc
+
+  private static class MockRecordReader extends RecordReader<Object, Object> {
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException, InterruptedException {
+      return "mockKey";
+    }
+
+    @Override
+    public Object getCurrentValue() throws IOException, InterruptedException {
+      return "mockValue";
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return 0.5f;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException,
+        InterruptedException {
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return true;
+    }
+  }
+
+  private static class MockInputSplit extends InputSplit implements Writable  {
+    private String location;
+
+    // used through reflection by Hadoop
+    @SuppressWarnings("unused")
+    public MockInputSplit() {
+    }
+
+    public MockInputSplit(String location) {
+      this.location = location;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return new String[] { location };
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return 10000000;
+    }
+
+    @Override
+    public boolean equals(Object arg0) {
+      return arg0==this;
+    }
+
+    @Override
+    public int hashCode() {
+      return location.hashCode();
+    }
+
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+      location = arg0.readUTF();
+    }
+
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+      arg0.writeUTF(location);
+    }
+  }
+
+  private static class MockInputFormat extends InputFormat<Object, Object> {
+
+    private final String location;
+
+    public MockInputFormat(String location) {
+      this.location = location;
+    }
+
+    @Override
+    public RecordReader<Object, Object> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
+        throws IOException, InterruptedException {
+      return new MockRecordReader();
+    }
+
+    @Override
+    public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
+      return Arrays.<InputSplit>asList(new MockInputSplit(location));
+    }
+  }
+
+  // mocks for StoreFunc
+  private static final class MockRecordWriter extends RecordWriter<Object, Object> {
+
+    @Override
+    public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+    }
+
+    @Override
+    public void write(Object arg0, Object arg1) throws IOException, InterruptedException {
+    }
+
+  }
+
+  private static class MockOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void abortTask(TaskAttemptContext arg0) throws IOException {
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext arg0) throws IOException {
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+      return true;
+    }
+
+    @Override
+    public void setupJob(JobContext arg0) throws IOException {
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext arg0) throws IOException {
+    }
+
+  }
+
+  private static final class MockOutputFormat extends OutputFormat<Object, Object> {
+
+    @Override
+    public void checkOutputSpecs(JobContext arg0) throws IOException, InterruptedException {
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException,
+    InterruptedException {
+      return new MockOutputCommitter();
+    }
+
+    @Override
+    public RecordWriter<Object, Object> getRecordWriter(TaskAttemptContext arg0) throws IOException,
+    InterruptedException {
+      return new MockRecordWriter();
+    }
+
+  }
+
+}

Added: pig/branches/branch-0.10/test/org/apache/pig/builtin/mock/TestMockStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/builtin/mock/TestMockStorage.java?rev=1331158&view=auto
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/builtin/mock/TestMockStorage.java (added)
+++ pig/branches/branch-0.10/test/org/apache/pig/builtin/mock/TestMockStorage.java Fri Apr 27 00:34:29 2012
@@ -0,0 +1,60 @@
+package org.apache.pig.builtin.mock;
+
+import static junit.framework.Assert.*;
+import static org.apache.pig.builtin.mock.Storage.*;
+
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.Utils;
+import org.junit.Test;
+
+public class TestMockStorage {
+
+  @Test
+  public void testMockStoreAndLoad() throws Exception {
+    PigServer pigServer = new PigServer(ExecType.LOCAL);
+    Data data = resetData(pigServer);
+
+    data.set("foo",
+        tuple("a"),
+        tuple("b"),
+        tuple("c")
+        );
+
+    pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();");
+    pigServer.registerQuery("STORE A INTO 'bar' USING mock.Storage();");
+
+    List<Tuple> out = data.get("bar");
+    assertEquals(tuple("a"), out.get(0));
+    assertEquals(tuple("b"), out.get(1));
+    assertEquals(tuple("c"), out.get(2));
+  }
+  
+  @Test
+  public void testMockSchema() throws Exception {
+    PigServer pigServer = new PigServer(ExecType.LOCAL);
+    Data data = resetData(pigServer);
+
+    data.set("foo", "blah:chararray",
+        tuple("a"),
+        tuple("b"),
+        tuple("c")
+        );
+
+    pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();");
+    pigServer.registerQuery("B = FOREACH A GENERATE blah as a, blah as b;");
+    pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();");
+
+    assertEquals(schema("a:chararray,b:chararray"), data.getSchema("bar"));
+    
+    List<Tuple> out = data.get("bar");
+    assertEquals(tuple("a", "a"), out.get(0));
+    assertEquals(tuple("b", "b"), out.get(1));
+    assertEquals(tuple("c", "c"), out.get(2));
+  }
+
+}