You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/03/30 14:40:26 UTC
svn commit: r1670078 - in /pig/trunk: ./ src/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/builtin/mock/ test/org/apache/pig/test/
Author: rohini
Date: Mon Mar 30 12:40:26 2015
New Revision: 1670078
URL: http://svn.apache.org/r1670078
Log:
PIG-4480: Pig script failure on Tez with split and order by due to missing sample collection (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
pig/trunk/src/pig-default.properties
pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1670078&r1=1670077&r2=1670078&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Mar 30 12:40:26 2015
@@ -58,6 +58,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4480: Pig script failure on Tez with split and order by due to missing sample collection (rohini)
+
PIG-4484: Ant pull jetty-6.1.26.zip on some platform (daijy)
PIG-4479: Pig script with union within nested splits followed by join failed on Tez (rohini)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1670078&r1=1670077&r2=1670078&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Mon Mar 30 12:40:26 2015
@@ -103,6 +103,14 @@ public class POReservoirSample extends P
rowProcessed++;
} else if (res.returnStatus == POStatus.STATUS_NULL) {
continue;
+ } else if (res.returnStatus == POStatus.STATUS_EOP) {
+ if (this.parentPlan.endOfAllInput) {
+ break;
+ } else {
+ // In case of Split can get EOP in between.
+ // Return here instead of setting lastSample to EOP in getSample
+ return res;
+ }
} else {
break;
}
Modified: pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/mock/Storage.java?rev=1670078&r1=1670077&r2=1670078&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/mock/Storage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/mock/Storage.java Mon Mar 30 12:40:26 2015
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.TreeMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
@@ -102,7 +103,7 @@ import org.apache.pig.parser.ParserExcep
* pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();");
*
* assertEquals(schema("a:chararray,b:chararray"), data.getSchema("bar"));
- *
+ *
* List<Tuple> out = data.get("bar");
* assertEquals(tuple("a", "a"), out.get(0));
* assertEquals(tuple("b", "b"), out.get(1));
@@ -132,7 +133,7 @@ public class Storage extends LoadFunc im
public static DataBag bag(Tuple... tuples) {
return new NonSpillableDataBag(Arrays.asList(tuples));
}
-
+
/**
* @param schema
* @return the schema represented by the string
@@ -193,7 +194,8 @@ public class Storage extends LoadFunc im
private static class Parts {
final String location;
- final Map<String, Collection<Tuple>> parts = new HashMap<String, Collection<Tuple>>();
+ // TreeMap to read part files in order
+ final Map<String, Collection<Tuple>> parts = new TreeMap<String, Collection<Tuple>>();
public Parts(String location) {
super();
@@ -216,7 +218,7 @@ public class Storage extends LoadFunc im
}
}
-
+
/**
* An isolated data store to avoid side effects
*
@@ -249,7 +251,7 @@ public class Storage extends LoadFunc im
public void set(String location, String schema, Tuple... data) throws ParserException {
set(location, Utils.getSchemaFromString(schema), Arrays.asList(data));
}
-
+
/**
* to set the data in a location with a known schema
*
@@ -316,7 +318,7 @@ public class Storage extends LoadFunc im
public void set(String location, Tuple... data) {
set(location, Arrays.asList(data));
}
-
+
/**
*
* @param location
@@ -330,7 +332,7 @@ public class Storage extends LoadFunc im
}
/**
- *
+ *
* @param location
* @return the schema stored in this location
*/
@@ -352,7 +354,7 @@ public class Storage extends LoadFunc im
private String location;
private Data data;
-
+
private Schema schema;
private Iterator<Tuple> dataBeingRead;
@@ -403,9 +405,9 @@ private MockRecordWriter mockRecordWrite
public void setUDFContextSignature(String signature) {
super.setUDFContextSignature(signature);
}
-
+
// LoadMetaData
-
+
@Override
public ResourceSchema getSchema(String location, Job job) throws IOException {
init(location, job);
@@ -477,7 +479,7 @@ private MockRecordWriter mockRecordWrite
}
// StoreMetaData
-
+
@Override
public void storeStatistics(ResourceStatistics stats, String location, Job job)
throws IOException {
@@ -490,7 +492,7 @@ private MockRecordWriter mockRecordWrite
init(location, job);
data.setSchema(location, Schema.getPigSchema(schema));
}
-
+
// Mocks for LoadFunc
private static class MockRecordReader extends RecordReader<Object, Object> {
Modified: pig/trunk/src/pig-default.properties
URL: http://svn.apache.org/viewvc/pig/trunk/src/pig-default.properties?rev=1670078&r1=1670077&r2=1670078&view=diff
==============================================================================
--- pig/trunk/src/pig-default.properties (original)
+++ pig/trunk/src/pig-default.properties Mon Mar 30 12:40:26 2015
@@ -57,4 +57,4 @@ pig.sql.type=hcat
pig.output.committer.recovery.support=false
pig.stats.output.size.reader=org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader
-pig.stats.output.size.reader.unsupported=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage
+pig.stats.output.size.reader.unsupported=org.apache.pig.builtin.mock.Storage,org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage
Modified: pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=1670078&r1=1670077&r2=1670078&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Mon Mar 30 12:40:26 2015
@@ -28,6 +28,7 @@ import java.util.Properties;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.junit.After;
@@ -37,7 +38,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.apache.pig.builtin.mock.Storage;
@RunWith(JUnit4.class)
public class TestMultiQuery {
@@ -838,6 +838,37 @@ public class TestMultiQuery {
iter.next().toString().equals("(world,{(2,world)})");
}
+ @Test
+ public void testMultiQueryJiraPig4480() throws Exception {
+
+ Storage.Data data = Storage.resetData(myPig);
+ data.set("inputLocation",
+ Storage.tuple(1, Storage.bag(Storage.tuple("hello"), Storage.tuple("world"), Storage.tuple("program"))),
+ Storage.tuple(2, Storage.bag(Storage.tuple("my"), Storage.tuple("world"))));
+
+ myPig.setBatchOn();
+ myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (a:int, b:bag{(c:chararray)});");
+ myPig.registerQuery("A = foreach A generate a, flatten(b);");
+ myPig.registerQuery("A1 = foreach A generate a;");
+ myPig.registerQuery("A1 = distinct A1;");
+ myPig.registerQuery("A2 = filter A by c == 'world';");
+ myPig.registerQuery("A2 = ORDER A2 by a parallel 2;");
+ myPig.registerQuery("store A1 into 'output1' using mock.Storage();");
+ myPig.registerQuery("store A2 into 'output2' using mock.Storage();");
+
+ myPig.executeBatch();
+
+ List<Tuple> actualResults = data.get("output1");
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {"(1)", "(2)"});
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+
+ actualResults = data.get("output2");
+ expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {"(1, 'world')", "(2, 'world')"});
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+ }
+
// --------------------------------------------------------------------------
// Helper methods