You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ju...@apache.org on 2012/07/26 22:49:10 UTC

svn commit: r1366193 - in /pig/trunk: CHANGES.txt src/org/apache/pig/builtin/mock/Storage.java test/org/apache/pig/builtin/mock/TestMockStorage.java

Author: julien
Date: Thu Jul 26 20:49:09 2012
New Revision: 1366193

URL: http://svn.apache.org/viewvc?rev=1366193&view=rev
Log:
PIG-2839: mock.Storage overwrites output with the last relation written when storing UNION (julien)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
    pig/trunk/test/org/apache/pig/builtin/mock/TestMockStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1366193&r1=1366192&r2=1366193&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jul 26 20:49:09 2012
@@ -208,6 +208,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-2839: mock.Storage overwrites output with the last relation written when storing UNION (julien)
+
 PIG-2840: Fix SchemaTuple bugs (jcoveney)
 
 PIG-2842: TestNewPlanOperatorPlan fails when new Configuration() picks up a previous minicluster conf file (julien)

Modified: pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/mock/Storage.java?rev=1366193&r1=1366192&r2=1366193&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/mock/Storage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/mock/Storage.java Thu Jul 26 20:49:09 2012
@@ -1,5 +1,7 @@
 package org.apache.pig.builtin.mock;
 
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getUniqueFile;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -11,7 +13,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
@@ -171,13 +175,39 @@ public class Storage extends LoadFunc im
     return data;
   }
 
+  private static class Parts {
+    final String location;
+    final Map<String, Collection<Tuple>> parts = new HashMap<String, Collection<Tuple>>();
+
+    public Parts(String location) {
+      super();
+      this.location = location;
+    }
+
+    public void set(String partFile, Collection<Tuple> data) {
+      if (parts.put(partFile, data) != null) {
+        throw new RuntimeException("the part " + partFile + " for location " + location + " already exists");
+      }
+    }
+
+    public List<Tuple> getAll() {
+        List<Tuple> all = new ArrayList<Tuple>();
+        Set<Entry<String, Collection<Tuple>>> entrySet = parts.entrySet();
+        for (Entry<String, Collection<Tuple>> entry : entrySet) {
+            all.addAll(entry.getValue());
+        }
+        return all;
+    }
+
+  }
+  
   /**
    * An isolated data store to avoid side effects
    *
    */
   public static class Data implements Serializable {
     private static final long serialVersionUID = 1L;
-    private Map<String, Collection<Tuple>> locationToData = new HashMap<String, Collection<Tuple>>();
+    private Map<String, Parts> locationToData = new HashMap<String, Parts>();
     private Map<String, Schema> locationToSchema = new HashMap<String, Schema>();
 
     /**
@@ -213,7 +243,9 @@ public class Storage extends LoadFunc im
      */
     public void set(String location, Schema schema, Collection<Tuple> data) {
       set(location, data);
-      locationToSchema.put(location, schema);
+      if (locationToSchema.put(location, schema) != null) {
+          throw new RuntimeException("schema already set for location "+location);
+      }
     }
 
     /**
@@ -233,8 +265,30 @@ public class Storage extends LoadFunc im
      * @param location "where" to store the tuples
      * @param data the tuples to store
      */
+    private void setInternal(String location, String partID, Collection<Tuple> data) {
+        Parts parts = locationToData.get(location);
+        if (partID == null) {
+            if (parts == null) {
+                partID = "mock";
+            } else {
+                throw new RuntimeException("Can not set location " + location + " twice");
+            }
+        }
+        if (parts == null) {
+            parts = new Parts(location);
+            locationToData.put(location, parts);
+        }
+        parts.set(partID, data);
+    }
+
+    /**
+     * to set the data in a location
+     *
+     * @param location "where" to store the tuples
+     * @param data the tuples to store
+     */
     public void set(String location, Collection<Tuple> data) {
-      locationToData.put(location, data);
+      setInternal(location, null, data);
     }
 
     /**
@@ -244,7 +298,7 @@ public class Storage extends LoadFunc im
      * @param data the tuples to store
      */
     public void set(String location, Tuple... data) {
-      set(location, Arrays.asList(data));
+        set(location, Arrays.asList(data));
     }
     
     /**
@@ -256,8 +310,7 @@ public class Storage extends LoadFunc im
       if (!locationToData.containsKey(location)) {
         throw new RuntimeException("No data for location '" + location + "'");
       }
-      Collection<Tuple> collection = locationToData.get(location);
-	return collection instanceof List ? (List<Tuple>)collection : new ArrayList<Tuple>(collection);
+      return locationToData.get(location).getAll();
     }
 
     /**
@@ -286,9 +339,8 @@ public class Storage extends LoadFunc im
   
   private Schema schema;
 
-  private List<Tuple> dataBeingWritten;
-
   private Iterator<Tuple> dataBeingRead;
+private MockRecordWriter mockRecordWriter;
 
   private void init(String location, Job job) throws IOException {
 	  this.data = getData(job);
@@ -385,13 +437,13 @@ public class Storage extends LoadFunc im
 
   @Override
   public void prepareToWrite(@SuppressWarnings("rawtypes") RecordWriter writer) throws IOException {
-    this.dataBeingWritten = new ArrayList<Tuple>();
-    this.data.set(location, dataBeingWritten);
+      mockRecordWriter = (MockRecordWriter) writer;
+      this.data.setInternal(location, mockRecordWriter.partID, mockRecordWriter.dataBeingWritten);
   }
 
   @Override
   public void putNext(Tuple t) throws IOException {
-    this.dataBeingWritten.add(t);
+      mockRecordWriter.dataBeingWritten.add(t);
   }
 
   @Override
@@ -517,8 +569,16 @@ public class Storage extends LoadFunc im
   // mocks for StoreFunc
   private static final class MockRecordWriter extends RecordWriter<Object, Object> {
 
+    private final List<Tuple> dataBeingWritten = new ArrayList<Tuple>();
+    private final String partID;
+
+    public MockRecordWriter(String partID) {
+        super();
+        this.partID = partID;
+    }
+
     @Override
-    public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
+    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
     }
 
     @Override
@@ -567,7 +627,7 @@ public class Storage extends LoadFunc im
     @Override
     public RecordWriter<Object, Object> getRecordWriter(TaskAttemptContext arg0) throws IOException,
     InterruptedException {
-      return new MockRecordWriter();
+      return new MockRecordWriter(getUniqueFile(arg0, "part", ".mock"));
     }
 
   }

Modified: pig/trunk/test/org/apache/pig/builtin/mock/TestMockStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/mock/TestMockStorage.java?rev=1366193&r1=1366192&r2=1366193&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/mock/TestMockStorage.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/mock/TestMockStorage.java Thu Jul 26 20:49:09 2012
@@ -3,10 +3,14 @@ package org.apache.pig.builtin.mock;
 import static junit.framework.Assert.*;
 import static org.apache.pig.builtin.mock.Storage.*;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.Utils;
@@ -50,11 +54,101 @@ public class TestMockStorage {
     pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();");
 
     assertEquals(schema("a:chararray,b:chararray"), data.getSchema("bar"));
-    
+
     List<Tuple> out = data.get("bar");
     assertEquals(tuple("a", "a"), out.get(0));
     assertEquals(tuple("b", "b"), out.get(1));
     assertEquals(tuple("c", "c"), out.get(2));
   }
 
+  @Test
+  public void testMockStoreUnion() throws Exception {
+    PigServer pigServer = new PigServer(ExecType.LOCAL);
+    Data data = resetData(pigServer);
+
+    data.set("input1",
+        tuple("a"),
+        tuple("b"),
+        tuple("c")
+        );
+
+    data.set("input2",
+            tuple("d"),
+            tuple("e"),
+            tuple("f")
+            );
+
+    pigServer.registerQuery("A = LOAD 'input1' USING mock.Storage();");
+    pigServer.registerQuery("B = LOAD 'input2' USING mock.Storage();");
+    pigServer.registerQuery("C = UNION A, B;");
+    pigServer.registerQuery("STORE C INTO 'output' USING mock.Storage();");
+
+    List<Tuple> out = data.get("output");
+    assertEquals(out + " size", 6, out.size());
+    Set<String> set = new HashSet<String>();
+    for (Tuple tuple : out) {
+        if (!set.add((String)tuple.get(0))) {
+            fail(tuple.get(0) + " is present twice in " + out);
+        }
+    }
+
+    assertTrue(set + " contains a", set.contains("a"));
+    assertTrue(set + " contains b", set.contains("b"));
+    assertTrue(set + " contains c", set.contains("c"));
+    assertTrue(set + " contains d", set.contains("d"));
+    assertTrue(set + " contains e", set.contains("e"));
+    assertTrue(set + " contains f", set.contains("f"));
+  }
+  
+  @Test
+  public void testBadUsage1() throws Exception {
+    PigServer pigServer = new PigServer(ExecType.LOCAL);
+    Data data = resetData(pigServer);
+
+    data.set("input1",
+            tuple("a"),
+            tuple("b"),
+            tuple("c")
+            );
+
+    try {
+        data.set("input1",
+                tuple("d"),
+                tuple("e"),
+                tuple("f")
+                );
+        fail("should have thrown an exception for setting twice the same input");
+    } catch (RuntimeException e) {
+        assertEquals("Can not set location input1 twice", e.getMessage());
+    }
+  }
+  
+  @Test
+  public void testBadUsage2() throws Exception {
+    PigServer pigServer = new PigServer(ExecType.LOCAL);
+    Data data = resetData(pigServer);
+
+    data.set("input",
+        tuple("a"),
+        tuple("b"),
+        tuple("c")
+        );
+
+    pigServer.setBatchOn();
+    pigServer.registerQuery(
+         "A = LOAD 'input' USING mock.Storage();"
+        +"B = LOAD 'input' USING mock.Storage();"
+        +"STORE A INTO 'output' USING mock.Storage();"
+        +"STORE B INTO 'output' USING mock.Storage();");
+    List<ExecJob> results = pigServer.executeBatch();
+    boolean failed = false;
+    for (ExecJob execJob : results) {
+        if (execJob.getStatus() == JOB_STATUS.FAILED) {
+            failed = true;
+            break;
+        }
+    }
+    assertTrue("job should have failed for storing twice in the same location", failed);
+
+  }
 }