You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/03/30 14:40:26 UTC

svn commit: r1670078 - in /pig/trunk: ./ src/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/builtin/mock/ test/org/apache/pig/test/

Author: rohini
Date: Mon Mar 30 12:40:26 2015
New Revision: 1670078

URL: http://svn.apache.org/r1670078
Log:
PIG-4480: Pig script failure on Tez with split and order by due to missing sample collection (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
    pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
    pig/trunk/src/pig-default.properties
    pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1670078&r1=1670077&r2=1670078&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Mar 30 12:40:26 2015
@@ -58,6 +58,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4480: Pig script failure on Tez with split and order by due to missing sample collection (rohini)
+
 PIG-4484: Ant pull jetty-6.1.26.zip on some platform (daijy)
 
 PIG-4479: Pig script with union within nested splits followed by join failed on Tez (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1670078&r1=1670077&r2=1670078&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Mon Mar 30 12:40:26 2015
@@ -103,6 +103,14 @@ public class POReservoirSample extends P
                 rowProcessed++;
             } else if (res.returnStatus == POStatus.STATUS_NULL) {
                 continue;
+            } else if (res.returnStatus == POStatus.STATUS_EOP) {
+                if (this.parentPlan.endOfAllInput) {
+                    break;
+                } else {
+                    // In case of Split can get EOP in between.
+                    // Return here instead of setting lastSample to EOP in getSample
+                    return res;
+                }
             } else {
                 break;
             }

Modified: pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/mock/Storage.java?rev=1670078&r1=1670077&r2=1670078&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/mock/Storage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/mock/Storage.java Mon Mar 30 12:40:26 2015
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
@@ -102,7 +103,7 @@ import org.apache.pig.parser.ParserExcep
  *  pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();");
  *
  *  assertEquals(schema("a:chararray,b:chararray"), data.getSchema("bar"));
- *  
+ *
  *  List<Tuple> out = data.get("bar");
  *  assertEquals(tuple("a", "a"), out.get(0));
  *  assertEquals(tuple("b", "b"), out.get(1));
@@ -132,7 +133,7 @@ public class Storage extends LoadFunc im
   public static DataBag bag(Tuple... tuples) {
     return new NonSpillableDataBag(Arrays.asList(tuples));
   }
-  
+
   /**
    * @param schema
    * @return the schema represented by the string
@@ -193,7 +194,8 @@ public class Storage extends LoadFunc im
 
   private static class Parts {
     final String location;
-    final Map<String, Collection<Tuple>> parts = new HashMap<String, Collection<Tuple>>();
+    // TreeMap to read part files in order
+    final Map<String, Collection<Tuple>> parts = new TreeMap<String, Collection<Tuple>>();
 
     public Parts(String location) {
       super();
@@ -216,7 +218,7 @@ public class Storage extends LoadFunc im
     }
 
   }
-  
+
   /**
    * An isolated data store to avoid side effects
    *
@@ -249,7 +251,7 @@ public class Storage extends LoadFunc im
     public void set(String location, String schema, Tuple... data) throws ParserException {
       set(location, Utils.getSchemaFromString(schema), Arrays.asList(data));
     }
-    
+
     /**
      * to set the data in a location with a known schema
      *
@@ -316,7 +318,7 @@ public class Storage extends LoadFunc im
     public void set(String location, Tuple... data) {
         set(location, Arrays.asList(data));
     }
-    
+
     /**
      *
      * @param location
@@ -330,7 +332,7 @@ public class Storage extends LoadFunc im
     }
 
     /**
-     * 
+     *
      * @param location
      * @return the schema stored in this location
      */
@@ -352,7 +354,7 @@ public class Storage extends LoadFunc im
   private String location;
 
   private Data data;
-  
+
   private Schema schema;
 
   private Iterator<Tuple> dataBeingRead;
@@ -403,9 +405,9 @@ private MockRecordWriter mockRecordWrite
   public void setUDFContextSignature(String signature) {
     super.setUDFContextSignature(signature);
   }
-  
+
   // LoadMetaData
-  
+
   @Override
   public ResourceSchema getSchema(String location, Job job) throws IOException {
 	init(location, job);
@@ -477,7 +479,7 @@ private MockRecordWriter mockRecordWrite
   }
 
   // StoreMetaData
-  
+
   @Override
   public void storeStatistics(ResourceStatistics stats, String location, Job job)
   		throws IOException {
@@ -490,7 +492,7 @@ private MockRecordWriter mockRecordWrite
 	init(location, job);
 	data.setSchema(location, Schema.getPigSchema(schema));
   }
-  
+
   // Mocks for LoadFunc
 
   private static class MockRecordReader extends RecordReader<Object, Object> {

Modified: pig/trunk/src/pig-default.properties
URL: http://svn.apache.org/viewvc/pig/trunk/src/pig-default.properties?rev=1670078&r1=1670077&r2=1670078&view=diff
==============================================================================
--- pig/trunk/src/pig-default.properties (original)
+++ pig/trunk/src/pig-default.properties Mon Mar 30 12:40:26 2015
@@ -57,4 +57,4 @@ pig.sql.type=hcat
 pig.output.committer.recovery.support=false
 
 pig.stats.output.size.reader=org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader
-pig.stats.output.size.reader.unsupported=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage
+pig.stats.output.size.reader.unsupported=org.apache.pig.builtin.mock.Storage,org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage

Modified: pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=1670078&r1=1670077&r2=1670078&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Mon Mar 30 12:40:26 2015
@@ -28,6 +28,7 @@ import java.util.Properties;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.junit.After;
@@ -37,7 +38,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.apache.pig.builtin.mock.Storage;
 
 @RunWith(JUnit4.class)
 public class TestMultiQuery {
@@ -838,6 +838,37 @@ public class TestMultiQuery {
         iter.next().toString().equals("(world,{(2,world)})");
     }
 
+    @Test
+    public void testMultiQueryJiraPig4480() throws Exception {
+
+        Storage.Data data = Storage.resetData(myPig);
+        data.set("inputLocation",
+                Storage.tuple(1, Storage.bag(Storage.tuple("hello"), Storage.tuple("world"), Storage.tuple("program"))),
+                Storage.tuple(2, Storage.bag(Storage.tuple("my"), Storage.tuple("world"))));
+
+        myPig.setBatchOn();
+        myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (a:int, b:bag{(c:chararray)});");
+        myPig.registerQuery("A = foreach A generate a, flatten(b);");
+        myPig.registerQuery("A1 = foreach A generate a;");
+        myPig.registerQuery("A1 = distinct A1;");
+        myPig.registerQuery("A2 = filter A by c == 'world';");
+        myPig.registerQuery("A2 = ORDER A2 by a parallel 2;");
+        myPig.registerQuery("store A1 into 'output1' using mock.Storage();");
+        myPig.registerQuery("store A2 into 'output2' using mock.Storage();");
+
+        myPig.executeBatch();
+
+        List<Tuple> actualResults = data.get("output1");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"(1)", "(2)"});
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+
+        actualResults = data.get("output2");
+        expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"(1, 'world')", "(2, 'world')"});
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+
     // --------------------------------------------------------------------------
     // Helper methods