You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ju...@apache.org on 2012/07/26 22:49:10 UTC
svn commit: r1366193 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/builtin/mock/Storage.java
test/org/apache/pig/builtin/mock/TestMockStorage.java
Author: julien
Date: Thu Jul 26 20:49:09 2012
New Revision: 1366193
URL: http://svn.apache.org/viewvc?rev=1366193&view=rev
Log:
PIG-2839: mock.Storage overwrites output with the last relation written when storing UNION (julien)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
pig/trunk/test/org/apache/pig/builtin/mock/TestMockStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1366193&r1=1366192&r2=1366193&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jul 26 20:49:09 2012
@@ -208,6 +208,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-2839: mock.Storage overwrites output with the last relation written when storing UNION (julien)
+
PIG-2840: Fix SchemaTuple bugs (jcoveney)
PIG-2842: TestNewPlanOperatorPlan fails when new Configuration() picks up a previous minicluster conf file (julien)
Modified: pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/mock/Storage.java?rev=1366193&r1=1366192&r2=1366193&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/mock/Storage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/mock/Storage.java Thu Jul 26 20:49:09 2012
@@ -1,5 +1,7 @@
package org.apache.pig.builtin.mock;
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getUniqueFile;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -11,7 +13,9 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
+import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
@@ -171,13 +175,39 @@ public class Storage extends LoadFunc im
return data;
}
+ private static class Parts {
+ final String location;
+ final Map<String, Collection<Tuple>> parts = new HashMap<String, Collection<Tuple>>();
+
+ public Parts(String location) {
+ super();
+ this.location = location;
+ }
+
+ public void set(String partFile, Collection<Tuple> data) {
+ if (parts.put(partFile, data) != null) {
+ throw new RuntimeException("the part " + partFile + " for location " + location + " already exists");
+ }
+ }
+
+ public List<Tuple> getAll() {
+ List<Tuple> all = new ArrayList<Tuple>();
+ Set<Entry<String, Collection<Tuple>>> entrySet = parts.entrySet();
+ for (Entry<String, Collection<Tuple>> entry : entrySet) {
+ all.addAll(entry.getValue());
+ }
+ return all;
+ }
+
+ }
+
/**
* An isolated data store to avoid side effects
*
*/
public static class Data implements Serializable {
private static final long serialVersionUID = 1L;
- private Map<String, Collection<Tuple>> locationToData = new HashMap<String, Collection<Tuple>>();
+ private Map<String, Parts> locationToData = new HashMap<String, Parts>();
private Map<String, Schema> locationToSchema = new HashMap<String, Schema>();
/**
@@ -213,7 +243,9 @@ public class Storage extends LoadFunc im
*/
public void set(String location, Schema schema, Collection<Tuple> data) {
set(location, data);
- locationToSchema.put(location, schema);
+ if (locationToSchema.put(location, schema) != null) {
+ throw new RuntimeException("schema already set for location "+location);
+ }
}
/**
@@ -233,8 +265,30 @@ public class Storage extends LoadFunc im
* @param location "where" to store the tuples
* @param data the tuples to store
*/
+ private void setInternal(String location, String partID, Collection<Tuple> data) {
+ Parts parts = locationToData.get(location);
+ if (partID == null) {
+ if (parts == null) {
+ partID = "mock";
+ } else {
+ throw new RuntimeException("Can not set location " + location + " twice");
+ }
+ }
+ if (parts == null) {
+ parts = new Parts(location);
+ locationToData.put(location, parts);
+ }
+ parts.set(partID, data);
+ }
+
+ /**
+ * to set the data in a location
+ *
+ * @param location "where" to store the tuples
+ * @param data the tuples to store
+ */
public void set(String location, Collection<Tuple> data) {
- locationToData.put(location, data);
+ setInternal(location, null, data);
}
/**
@@ -244,7 +298,7 @@ public class Storage extends LoadFunc im
* @param data the tuples to store
*/
public void set(String location, Tuple... data) {
- set(location, Arrays.asList(data));
+ set(location, Arrays.asList(data));
}
/**
@@ -256,8 +310,7 @@ public class Storage extends LoadFunc im
if (!locationToData.containsKey(location)) {
throw new RuntimeException("No data for location '" + location + "'");
}
- Collection<Tuple> collection = locationToData.get(location);
- return collection instanceof List ? (List<Tuple>)collection : new ArrayList<Tuple>(collection);
+ return locationToData.get(location).getAll();
}
/**
@@ -286,9 +339,8 @@ public class Storage extends LoadFunc im
private Schema schema;
- private List<Tuple> dataBeingWritten;
-
private Iterator<Tuple> dataBeingRead;
+private MockRecordWriter mockRecordWriter;
private void init(String location, Job job) throws IOException {
this.data = getData(job);
@@ -385,13 +437,13 @@ public class Storage extends LoadFunc im
@Override
public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException {
- this.dataBeingWritten = new ArrayList<Tuple>();
- this.data.set(location, dataBeingWritten);
+ mockRecordWriter = (MockRecordWriter) writer;
+ this.data.setInternal(location, mockRecordWriter.partID, mockRecordWriter.dataBeingWritten);
}
@Override
public void putNext(Tuple t) throws IOException {
- this.dataBeingWritten.add(t);
+ mockRecordWriter.dataBeingWritten.add(t);
}
@Override
@@ -517,8 +569,16 @@ public class Storage extends LoadFunc im
// mocks for StoreFunc
private static final class MockRecordWriter extends RecordWriter<Object, Object> {
+ private final List<Tuple> dataBeingWritten = new ArrayList<Tuple>();
+ private final String partID;
+
+ public MockRecordWriter(String partID) {
+ super();
+ this.partID = partID;
+ }
+
@Override
- public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
}
@Override
@@ -567,7 +627,7 @@ public class Storage extends LoadFunc im
@Override
public RecordWriter<Object, Object> getRecordWriter(TaskAttemptContext arg0) throws IOException,
InterruptedException {
- return new MockRecordWriter();
+ return new MockRecordWriter(getUniqueFile(arg0, "part", ".mock"));
}
}
Modified: pig/trunk/test/org/apache/pig/builtin/mock/TestMockStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/mock/TestMockStorage.java?rev=1366193&r1=1366192&r2=1366193&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/mock/TestMockStorage.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/mock/TestMockStorage.java Thu Jul 26 20:49:09 2012
@@ -3,10 +3,14 @@ package org.apache.pig.builtin.mock;
import static junit.framework.Assert.*;
import static org.apache.pig.builtin.mock.Storage.*;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.Utils;
@@ -50,11 +54,101 @@ public class TestMockStorage {
pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();");
assertEquals(schema("a:chararray,b:chararray"), data.getSchema("bar"));
-
+
List<Tuple> out = data.get("bar");
assertEquals(tuple("a", "a"), out.get(0));
assertEquals(tuple("b", "b"), out.get(1));
assertEquals(tuple("c", "c"), out.get(2));
}
+ @Test
+ public void testMockStoreUnion() throws Exception {
+ PigServer pigServer = new PigServer(ExecType.LOCAL);
+ Data data = resetData(pigServer);
+
+ data.set("input1",
+ tuple("a"),
+ tuple("b"),
+ tuple("c")
+ );
+
+ data.set("input2",
+ tuple("d"),
+ tuple("e"),
+ tuple("f")
+ );
+
+ pigServer.registerQuery("A = LOAD 'input1' USING mock.Storage();");
+ pigServer.registerQuery("B = LOAD 'input2' USING mock.Storage();");
+ pigServer.registerQuery("C = UNION A, B;");
+ pigServer.registerQuery("STORE C INTO 'output' USING mock.Storage();");
+
+ List<Tuple> out = data.get("output");
+ assertEquals(out + " size", 6, out.size());
+ Set<String> set = new HashSet<String>();
+ for (Tuple tuple : out) {
+ if (!set.add((String)tuple.get(0))) {
+ fail(tuple.get(0) + " is present twice in " + out);
+ }
+ }
+
+ assertTrue(set + " contains a", set.contains("a"));
+ assertTrue(set + " contains b", set.contains("b"));
+ assertTrue(set + " contains c", set.contains("c"));
+ assertTrue(set + " contains d", set.contains("d"));
+ assertTrue(set + " contains e", set.contains("e"));
+ assertTrue(set + " contains f", set.contains("f"));
+ }
+
+ @Test
+ public void testBadUsage1() throws Exception {
+ PigServer pigServer = new PigServer(ExecType.LOCAL);
+ Data data = resetData(pigServer);
+
+ data.set("input1",
+ tuple("a"),
+ tuple("b"),
+ tuple("c")
+ );
+
+ try {
+ data.set("input1",
+ tuple("d"),
+ tuple("e"),
+ tuple("f")
+ );
+ fail("should have thrown an exception for setting twice the same input");
+ } catch (RuntimeException e) {
+ assertEquals("Can not set location input1 twice", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBadUsage2() throws Exception {
+ PigServer pigServer = new PigServer(ExecType.LOCAL);
+ Data data = resetData(pigServer);
+
+ data.set("input",
+ tuple("a"),
+ tuple("b"),
+ tuple("c")
+ );
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery(
+ "A = LOAD 'input' USING mock.Storage();"
+ +"B = LOAD 'input' USING mock.Storage();"
+ +"STORE A INTO 'output' USING mock.Storage();"
+ +"STORE B INTO 'output' USING mock.Storage();");
+ List<ExecJob> results = pigServer.executeBatch();
+ boolean failed = false;
+ for (ExecJob execJob : results) {
+ if (execJob.getStatus() == JOB_STATUS.FAILED) {
+ failed = true;
+ break;
+ }
+ }
+ assertTrue("job should have failed for storing twice in the same location", failed);
+
+ }
}