You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/03/01 08:22:26 UTC
svn commit: r1573129 - in /pig/branches/tez: src/org/apache/pig/ test/
test/org/apache/pig/test/
Author: cheolsoo
Date: Sat Mar 1 07:22:25 2014
New Revision: 1573129
URL: http://svn.apache.org/r1573129
Log:
PIG-3784: Port more mini cluster tests to Tez (cheolsoo)
Modified:
pig/branches/tez/src/org/apache/pig/PigServer.java
pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java
pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline.java
pig/branches/tez/test/org/apache/pig/test/TestNestedForeach.java
pig/branches/tez/test/org/apache/pig/test/TestPigContext.java
pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
pig/branches/tez/test/org/apache/pig/test/TestPigStorage.java
pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java
pig/branches/tez/test/tez-tests
Modified: pig/branches/tez/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigServer.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigServer.java Sat Mar 1 07:22:25 2014
@@ -244,13 +244,8 @@ public class PigServer {
addJarsFromProperties();
markPredeployedJarsFromProperties();
- if (PigStats.get() == null) {
- PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
- }
-
- if (ScriptState.get() == null) {
- ScriptState.start(pigContext.getExecutionEngine().instantiateScriptState());
- }
+ PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
+ ScriptState.start(pigContext.getExecutionEngine().instantiateScriptState());
}
private void addJarsFromProperties() throws ExecException {
Modified: pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestCombiner.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestCombiner.java Sat Mar 1 07:22:25 2014
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Properties;
import org.apache.pig.EvalFunc;
-import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
@@ -48,7 +47,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
public class TestCombiner {
-
private static MiniGenericCluster cluster;
private static Properties properties;
@@ -74,14 +72,6 @@ public class TestCombiner {
FileLocalizer.setInitialized(false);
}
- @After
- public void tearDown() throws Exception {
- // Nullify PigStats and ScriptState after every run to ensure new
- // objects are instantiated for next run.
- PigStats.start(null);
- ScriptState.start(null);
- }
-
@Test
public void testSuccessiveUserFuncs1() throws Exception {
String query = "a = load 'students.txt' as (c1,c2,c3,c4); " +
Modified: pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java Sat Mar 1 07:22:25 2014
@@ -47,7 +47,6 @@ import org.junit.Ignore;
import org.junit.Test;
public class TestCustomPartitioner {
-
private static MiniGenericCluster cluster;
private static Properties properties;
private static PigServer pigServer;
Modified: pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline.java Sat Mar 1 07:22:25 2014
@@ -28,14 +28,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Random;
import java.util.StringTokenizer;
-import junit.framework.Assert;
-
-import org.apache.pig.ComparisonFunc;
import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
@@ -56,32 +53,37 @@ import org.apache.pig.impl.util.Pair;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.Identity;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-@RunWith(JUnit4.class)
public class TestEvalPipeline {
-
- static MiniCluster cluster = MiniCluster.buildCluster();
- private PigServer pigServer;
- private PigContext pigContext;
-
- TupleFactory mTf = TupleFactory.getInstance();
- BagFactory mBf = BagFactory.getInstance();
-
+ private static PigServer pigServer;
+ private static PigContext pigContext;
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
+
+ private TupleFactory mTf = TupleFactory.getInstance();
+ private BagFactory mBf = BagFactory.getInstance();
+
@Before
public void setUp() throws Exception{
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer = new PigServer(cluster.getExecType(), properties);
pigContext = pigServer.getPigContext();
}
-
+
+ @BeforeClass
+ public static void oneTimeSetup() {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ }
+
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
-
+
static public class MyBagFunction extends EvalFunc<DataBag>{
@Override
public DataBag exec(Tuple input) throws IOException {
@@ -93,35 +95,35 @@ public class TestEvalPipeline {
return output;
}
}
-
+
@Test
public void testFunctionInsideFunction() throws Exception{
File f1 = Util.createFile(new String[]{"a:1","b:1","a:1"});
- pigServer.registerQuery("a = load '"
- + Util.generateURI(f1.toString(), pigContext)
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(f1.toString(), pigContext)
+ "' using " + PigStorage.class.getName() + "(':');");
pigServer.registerQuery("b = foreach a generate 1-1/1;");
Iterator<Tuple> iter = pigServer.openIterator("b");
-
+
for (int i=0 ;i<3; i++){
- Assert.assertEquals(DataType.toDouble(iter.next().get(0)), 0.0);
+ Assert.assertEquals(0.0d, DataType.toDouble(iter.next().get(0)).doubleValue(), 0.0d);
}
}
-
+
@Test
public void testJoin() throws Exception{
File f1 = Util.createFile(new String[]{"a:1","b:1","a:1"});
File f2 = Util.createFile(new String[]{"b","b","a"});
-
- pigServer.registerQuery("a = load '"
- + Util.generateURI(f1.toString(), pigContext) + "' using "
+
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(f1.toString(), pigContext) + "' using "
+ PigStorage.class.getName() + "(':');");
- pigServer.registerQuery("b = load '"
+ pigServer.registerQuery("b = load '"
+ Util.generateURI(f2.toString(), pigContext) + "';");
- pigServer.registerQuery("c = cogroup a by $0, b by $0;");
+ pigServer.registerQuery("c = cogroup a by $0, b by $0;");
pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);");
-
+
Iterator<Tuple> iter = pigServer.openIterator("d");
int count = 0;
while(iter.hasNext()){
@@ -131,7 +133,7 @@ public class TestEvalPipeline {
}
Assert.assertEquals(count, 4);
}
-
+
@Test
public void testDriverMethod() throws Exception{
File f = Util.createTempFileDelOnExit("tmp", "");
@@ -139,8 +141,8 @@ public class TestEvalPipeline {
pw.println("a");
pw.println("a");
pw.close();
- pigServer.registerQuery("a = foreach (load '"
- + Util.generateURI(f.toString(), pigContext) + "') "
+ pigServer.registerQuery("a = foreach (load '"
+ + Util.generateURI(f.toString(), pigContext) + "') "
+ "generate 1, flatten(" + MyBagFunction.class.getName() + "(*));");
Iterator<Tuple> iter = pigServer.openIterator("a");
int count = 0;
@@ -153,61 +155,61 @@ public class TestEvalPipeline {
Assert.assertEquals(count, 6);
f.delete();
}
-
+
@Test
public void testMapLookup() throws Exception {
DataBag b = BagFactory.getInstance().newDefaultBag();
Map<String, Object> colors = new HashMap<String, Object>();
colors.put("apple","red");
colors.put("orange","orange");
-
+
Map<String, Object> weights = new HashMap<String, Object>();
weights.put("apple","0.1");
weights.put("orange","0.3");
-
+
Tuple t = mTf.newTuple();
t.append(colors);
t.append(weights);
b.add(t);
-
+
File tmpFile = File.createTempFile("tmp", "");
tmpFile.delete(); // we only needed the temp file name, so delete the file
String fileName = Util.removeColon(tmpFile.getAbsolutePath());
PigFile f = new PigFile(fileName);
f.store(b, new FuncSpec(BinStorage.class.getCanonicalName()),
- pigServer.getPigContext());
-
+ pigServer.getPigContext());
+
pigServer.registerQuery("a = load '" + Util.encodeEscape(fileName) + "' using BinStorage();");
pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');");
Iterator<Tuple> iter = pigServer.openIterator("b");
t = iter.next();
Assert.assertEquals(t.get(0).toString(), "red");
- Assert.assertEquals(DataType.toDouble(t.get(1)), 0.3);
+ Assert.assertEquals(0.3d, DataType.toDouble(t.get(1)).doubleValue(), 0.0d);
Assert.assertFalse(iter.hasNext());
Util.deleteFile(cluster, fileName);
}
-
+
static public class TitleNGrams extends EvalFunc<DataBag> {
-
+
@Override
- public DataBag exec(Tuple input) throws IOException {
+ public DataBag exec(Tuple input) throws IOException {
try {
DataBag output = BagFactory.getInstance().newDefaultBag();
String str = input.get(0).toString();
-
+
String title = str;
if (title != null) {
List<String> nGrams = makeNGrams(title);
-
+
for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) {
Tuple t = TupleFactory.getInstance().newTuple(1);
t.set(0, it.next());
output.add(t);
}
}
-
+
return output;
} catch (ExecException ee) {
IOException ioe = new IOException(ee.getMessage());
@@ -215,28 +217,28 @@ public class TestEvalPipeline {
throw ioe;
}
}
-
-
+
+
List<String> makeNGrams(String str) {
List<String> tokens = new ArrayList<String>();
-
+
StringTokenizer st = new StringTokenizer(str);
while (st.hasMoreTokens())
tokens.add(st.nextToken());
-
+
return nGramHelper(tokens, new ArrayList<String>());
}
-
+
ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) {
if (str.size() == 0)
return nGrams;
-
+
for (int i = 0; i < str.size(); i++)
nGrams.add(makeString(str.subList(0, i+1)));
-
+
return nGramHelper(str.subList(1, str.size()), nGrams);
}
-
+
String makeString(List<String> list) {
StringBuffer sb = new StringBuffer();
for (Iterator<String> it = list.iterator(); it.hasNext(); ) {
@@ -288,18 +290,18 @@ public class TestEvalPipeline {
myMap.put("map", mapInMap);
myMap.put("tuple", tuple);
myMap.put("bag", bag);
- return myMap;
+ return myMap;
}
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.MAP));
}
}
-
+
@Test
public void testBagFunctionWithFlattening() throws Exception{
File queryLogFile = Util.createFile(
- new String[]{
+ new String[]{
"stanford\tdeer\tsighting",
"bush\tpresident",
"stanford\tbush",
@@ -309,66 +311,52 @@ public class TestEvalPipeline {
"stanford\tpresident",
}
);
-
+
File newsFile = Util.createFile(
new String[]{
"deer seen at stanford",
- "george bush visits stanford",
- "yahoo hosting a conference in the bay area",
+ "george bush visits stanford",
+ "yahoo hosting a conference in the bay area",
"who will win the world cup"
}
- );
-
+ );
+
Map<String, Integer> expectedResults = new HashMap<String, Integer>();
expectedResults.put("bush", 2);
expectedResults.put("stanford", 3);
expectedResults.put("world", 1);
expectedResults.put("conference", 1);
-
- pigServer.registerQuery("newsArticles = LOAD '"
- + Util.generateURI(newsFile.toString(), pigContext)
+
+ pigServer.registerQuery("newsArticles = LOAD '"
+ + Util.generateURI(newsFile.toString(), pigContext)
+ "' USING " + TextLoader.class.getName() + "();");
- pigServer.registerQuery("queryLog = LOAD '"
+ pigServer.registerQuery("queryLog = LOAD '"
+ Util.generateURI(queryLogFile.toString(), pigContext) + "';");
pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;");
pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("answer");
if(!iter.hasNext()) Assert.fail("No Output received");
while(iter.hasNext()){
Tuple t = iter.next();
- Assert.assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),(DataType.toDouble(t.get(0))).doubleValue());
+ Assert.assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),
+ (DataType.toDouble(t.get(0))).doubleValue(), 0.0d);
}
}
-
- /*
- @Test
- public void testSort() throws Exception{
- testSortDistinct(false, false);
- }
- */
@Test
- public void testSortWithUDF() throws Exception{
- testSortDistinct(false, true);
- }
+ public void testSort() throws Exception{
+ testSortDistinct(false);
+ }
@Test
public void testDistinct() throws Exception{
- testSortDistinct(true, false);
+ testSortDistinct(true);
}
-
- public static class TupComp extends ComparisonFunc {
- @Override
- public int compare(Tuple t1, Tuple t2) {
- return t1.compareTo(t2);
- }
- }
-
- private void testSortDistinct(boolean eliminateDuplicates, boolean useUDF) throws Exception{
+ private void testSortDistinct(boolean eliminateDuplicates) throws Exception{
int LOOP_SIZE = 1024*16;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -376,18 +364,14 @@ public class TestEvalPipeline {
for(int i = 0; i < LOOP_SIZE; i++) {
ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
}
- ps.close();
-
- pigServer.registerQuery("A = LOAD '"
+ ps.close();
+
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
if (eliminateDuplicates){
pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
}else{
- if(!useUDF) {
- pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
- } else {
- pigServer.registerQuery("B = ORDER A BY $0 using " + TupComp.class.getName() + ";");
- }
+ pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
}
Iterator<Tuple> iter = pigServer.openIterator("B");
String last = "";
@@ -404,9 +388,9 @@ public class TestEvalPipeline {
Assert.assertEquals(t.size(), 2);
last = t.get(0).toString();
}
- }
+ }
}
-
+
@Test
public void testNestedPlan() throws Exception{
int LOOP_COUNT = 10;
@@ -420,7 +404,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = group A by $0;");
String query = "C = foreach B {"
@@ -460,7 +444,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = group A by $0;");
String query = "C = foreach B {"
@@ -502,7 +486,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = limit A 5;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -514,15 +498,15 @@ public class TestEvalPipeline {
}
Assert.assertEquals(5, numIdentity);
}
-
+
@Test
public void testComplexData() throws IOException, ExecException {
// Create input file with ascii data
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});
-
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
Iterator<Tuple> it = pigServer.openIterator("b");
@@ -532,10 +516,10 @@ public class TestEvalPipeline {
Assert.assertEquals("2", t.get(2).toString());
Assert.assertEquals("value1", t.get(3).toString());
Assert.assertEquals("value2", t.get(4).toString());
-
+
//test with BinStorage
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
String output = "/pig/out/TestEvalPipeline-testComplexData";
pigServer.deleteFile(output);
@@ -549,17 +533,17 @@ public class TestEvalPipeline {
Assert.assertEquals("1", t.get(1).toString());
Assert.assertEquals("2", t.get(2).toString());
Assert.assertEquals("value1", t.get(3).toString());
- Assert.assertEquals("value2", t.get(4).toString());
+ Assert.assertEquals("value2", t.get(4).toString());
}
@Test
public void testBinStorageDetermineSchema() throws IOException, ExecException {
// Create input file with ascii data
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});
-
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
Iterator<Tuple> it = pigServer.openIterator("b");
@@ -569,10 +553,10 @@ public class TestEvalPipeline {
Assert.assertEquals(2, t.get(2));
Assert.assertEquals("value1", t.get(3).toString());
Assert.assertEquals("value2", t.get(4).toString());
-
+
//test with BinStorage
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema";
pigServer.deleteFile(output);
@@ -587,7 +571,7 @@ public class TestEvalPipeline {
String[] generates = {"q = foreach p generate COUNT(b), t2.a, t2.b as t2b, m#'key1', m#'key2', b;",
"q = foreach p generate COUNT(b), t2.$0, t2.$1, m#'key1', m#'key2', b;",
"q = foreach p generate COUNT($0), $1.$0, $1.$1, $2#'key1', $2#'key2', $0;"};
-
+
for (int i = 0; i < loads.length; i++) {
pigServer.registerQuery(loads[i]);
pigServer.registerQuery(generates[i]);
@@ -605,18 +589,18 @@ public class TestEvalPipeline {
for (Iterator<Tuple> bit = bg.iterator(); bit.hasNext();) {
Tuple bt = bit.next();
Assert.assertEquals(String.class, bt.get(0).getClass());
- Assert.assertEquals(String.class, bt.get(1).getClass());
+ Assert.assertEquals(String.class, bt.get(1).getClass());
}
- }
+ }
}
@Test
public void testProjectBag() throws IOException, ExecException {
// This tests make sure that when a bag with multiple columns is
// projected all columns apear in the output
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"f1\tf2\tf3"});
- pigServer.registerQuery("a = load '"
+ pigServer.registerQuery("a = load '"
+ Util.generateURI(input.toString(), pigContext) + "' as (x, y, z);");
pigServer.registerQuery("b = group a by x;");
pigServer.registerQuery("c = foreach b generate flatten(a.(y, z));");
@@ -630,11 +614,11 @@ public class TestEvalPipeline {
@Test
public void testBinStorageDetermineSchema2() throws IOException, ExecException {
// Create input file with ascii data
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"pigtester\t10\t1.2"});
-
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (name:chararray, age:int, gpa:double);");
String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema2";
pigServer.deleteFile(output);
@@ -649,7 +633,7 @@ public class TestEvalPipeline {
String[] generates = {"q = foreach p generate name, age, gpa;",
"q = foreach p generate name, age, gpa;",
"q = foreach p generate $0, $1, $2;"};
-
+
for (int i = 0; i < loads.length; i++) {
pigServer.registerQuery(loads[i]);
pigServer.registerQuery(generates[i]);
@@ -662,7 +646,7 @@ public class TestEvalPipeline {
Assert.assertEquals(1.2, t.get(2));
Assert.assertEquals(Double.class, t.get(2).getClass());
}
-
+
// test that valid casting is allowed
pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
" as (name, age:long, gpa:float);");
@@ -675,7 +659,7 @@ public class TestEvalPipeline {
Assert.assertEquals(Long.class, t.get(1).getClass());
Assert.assertEquals(1.2f, t.get(2));
Assert.assertEquals(Float.class, t.get(2).getClass());
-
+
// test that implicit casts work
pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
" as (name, age, gpa);");
@@ -689,15 +673,15 @@ public class TestEvalPipeline {
Assert.assertEquals(1, t.get(2));
Assert.assertEquals(Integer.class, t.get(2).getClass());
}
-
+
@Test
public void testCogroupWithInputFromGroup() throws IOException, ExecException {
// Create input file with ascii data
- File input = Util.createInputFile("tmp", "",
- new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2",
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2",
"pigtester2\t10\t1.2",
"pigtester3\t10\t1.2", "pigtester3\t20\t1.2", "pigtester3\t30\t1.2"});
-
+
Map<String, Pair<Long, Long>> resultMap = new HashMap<String, Pair<Long, Long>>();
// we will in essence be doing a group on first column and getting
// SUM over second column and a count for the group - store
@@ -705,13 +689,13 @@ public class TestEvalPipeline {
resultMap.put("pigtester", new Pair<Long, Long>(25L, 2L));
resultMap.put("pigtester2", new Pair<Long, Long>(10L, 1L));
resultMap.put("pigtester3", new Pair<Long, Long>(60L, 3L));
-
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
- pigServer.registerQuery("c = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ pigServer.registerQuery("c = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("d = cogroup b by group, c by name;");
pigServer.registerQuery("e = foreach d generate flatten(group), SUM(c.age), COUNT(c.name);");
@@ -719,27 +703,28 @@ public class TestEvalPipeline {
for(int i = 0; i < resultMap.size(); i++) {
Tuple t = it.next();
Assert.assertEquals(true, resultMap.containsKey(t.get(0)));
- Pair<Long, Long> output = resultMap.get(t.get(0));
+ Pair<Long, Long> output = resultMap.get(t.get(0));
Assert.assertEquals(output.first, t.get(1));
Assert.assertEquals(output.second, t.get(2));
}
}
-
+
@Test
public void testUtf8Dump() throws IOException, ExecException {
-
+
// Create input file with unicode data
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"wendyξ"});
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext)
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext)
+ "' using PigStorage() " + "as (name:chararray);");
Iterator<Tuple> it = pigServer.openIterator("a");
Tuple t = it.next();
Assert.assertEquals("wendyξ", t.get(0));
-
+
}
+ @SuppressWarnings("unchecked")
@Test
public void testMapUDF() throws Exception{
int LOOP_COUNT = 2;
@@ -753,7 +738,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
String query = "C = foreach B {"
@@ -793,7 +778,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
String query = "C = foreach B {"
@@ -811,19 +796,19 @@ public class TestEvalPipeline {
@Test
public void testLoadCtorArgs() throws IOException, ExecException {
-
+
// Create input file
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"hello:world"});
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext)
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext)
+ "' using org.apache.pig.test.PigStorageNoDefCtor(':');");
pigServer.registerQuery("b = foreach a generate (chararray)$0, (chararray)$1;");
Iterator<Tuple> it = pigServer.openIterator("b");
Tuple t = it.next();
Assert.assertEquals("hello", t.get(0));
Assert.assertEquals("world", t.get(1));
-
+
}
@Test
@@ -839,7 +824,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = group A by $0;");
String query = "C = foreach B {"
@@ -881,7 +866,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = distinct A;");
String query = "C = foreach B {"
@@ -928,7 +913,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = distinct A;");
String query = "C = foreach B {"
@@ -969,7 +954,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = distinct A ;"); //the argument does not matter
pigServer.registerQuery("C = foreach B generate FLATTEN(" + Identity.class.getName() + "($0, $1));"); //the argument does not matter
@@ -990,7 +975,7 @@ public class TestEvalPipeline {
Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
}
-
+
@Test
public void testCogroupAfterDistinct() throws Exception {
String[] input1 = {
@@ -1011,13 +996,13 @@ public class TestEvalPipeline {
};
Util.createInputFile(cluster, "table1", input1);
Util.createInputFile(cluster, "table2", input2);
-
+
pigServer.registerQuery("nonuniqtable1 = LOAD 'table1' AS (f1:chararray);");
pigServer.registerQuery("table1 = DISTINCT nonuniqtable1;");
pigServer.registerQuery("table2 = LOAD 'table2' AS (f1:chararray, f2:int);");
pigServer.registerQuery("temp = COGROUP table1 BY f1 INNER, table2 BY f1;");
Iterator<Tuple> it = pigServer.openIterator("temp");
-
+
// results should be:
// (abc,{(abc)},{})
// (def,{(def)},{})
@@ -1025,7 +1010,7 @@ public class TestEvalPipeline {
HashMap<String, Tuple> results = new HashMap<String, Tuple>();
Object[] row = new Object[] { "abc",
Util.createBagOfOneColumn(new String[] { "abc"}), mBf.newDefaultBag() };
- results.put("abc", Util.createTuple(row));
+ results.put("abc", Util.createTuple(row));
row = new Object[] { "def",
Util.createBagOfOneColumn(new String[] { "def"}), mBf.newDefaultBag() };
results.put("def", Util.createTuple(row));
@@ -1044,16 +1029,16 @@ public class TestEvalPipeline {
Assert.assertEquals(expected.get(i++), field);
}
}
-
+
Util.deleteFile(cluster, "table1");
Util.deleteFile(cluster, "table2");
}
@Test
public void testAlgebraicDistinctProgress() throws Exception {
-
+
//creating a test input of larger than 1000 to make
- //sure that progress kicks in. The only way to test this
+ //sure that progress kicks in. The only way to test this
//is to add a log statement to the getDistinct
//method in Distinct.java. There is no automated mechanism
//to check this from pig
@@ -1066,66 +1051,66 @@ public class TestEvalPipeline {
inpString[i] = new Integer(i/2).toString();
inpString[i+1] = new Integer(i/2).toString();
}
-
+
Util.createInputFile(cluster, "table", inpString);
pigServer.registerQuery("a = LOAD 'table' AS (i:int);");
pigServer.registerQuery("b = group a ALL;");
pigServer.registerQuery("c = foreach b {aa = DISTINCT a; generate COUNT(aa);};");
Iterator<Tuple> it = pigServer.openIterator("c");
-
+
Integer[] exp = new Integer[inputSize/2];
for(int j = 0; j < inputSize/2; ++j) {
exp[j] = j;
}
DataBag expectedBag = Util.createBagOfOneColumn(exp);
-
+
while(it.hasNext()) {
Tuple tup = it.next();
Long resultBagSize = (Long)tup.get(0);
Assert.assertTrue(DataType.compare(expectedBag.size(), resultBagSize) == 0);
}
-
- Util.deleteFile(cluster, "table");
+
+ Util.deleteFile(cluster, "table");
}
@Test
public void testBinStorageWithLargeStrings() throws Exception {
// Create input file with large strings
- int testSize = 100;
- String[] stringArray = new String[testSize];
- Random random = new Random();
- stringArray[0] = GenRandomData.genRandLargeString(random, 65534);
- for(int i = 1; i < stringArray.length; ++i) {
- //generate a few large strings every 25th record
- if((i % 25) == 0) {
- stringArray[i] = GenRandomData.genRandLargeString(random, 65535 + i);
- } else {
- stringArray[i] = GenRandomData.genRandString(random);
- }
- }
-
- Util.createInputFile(cluster, "table", stringArray);
-
- //test with BinStorage
+ int testSize = 100;
+ String[] stringArray = new String[testSize];
+ Random random = new Random();
+ stringArray[0] = GenRandomData.genRandLargeString(random, 65534);
+ for(int i = 1; i < stringArray.length; ++i) {
+ //generate a few large strings every 25th record
+ if((i % 25) == 0) {
+ stringArray[i] = GenRandomData.genRandLargeString(random, 65535 + i);
+ } else {
+ stringArray[i] = GenRandomData.genRandString(random);
+ }
+ }
+
+ Util.createInputFile(cluster, "table", stringArray);
+
+ //test with BinStorage
pigServer.registerQuery("a = load 'table' using PigStorage() " +
"as (c: chararray);");
String output = "/pig/out/TestEvalPipeline-testBinStorageLargeStrings";
pigServer.deleteFile(output);
pigServer.store("a", output, BinStorage.class.getName());
-
+
pigServer.registerQuery("b = load '" + output +"' using BinStorage() " +
- "as (c:chararray);");
+ "as (c:chararray);");
pigServer.registerQuery("c = foreach b generate c;");
-
+
Iterator<Tuple> it = pigServer.openIterator("c");
int counter = 0;
while(it.hasNext()) {
Tuple tup = it.next();
String resultString = (String)tup.get(0);
String expectedString = stringArray[counter];
- Assert.assertTrue(expectedString.equals(resultString));
+ Assert.assertTrue(expectedString.equals(resultString));
++counter;
}
Util.deleteFile(cluster, "table");
Modified: pig/branches/tez/test/org/apache/pig/test/TestNestedForeach.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestNestedForeach.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestNestedForeach.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestNestedForeach.java Sat Mar 1 07:22:25 2014
@@ -18,172 +18,182 @@
package org.apache.pig.test;
+import java.io.IOException;
import java.util.Iterator;
+import java.util.Properties;
-import junit.framework.Assert;
-
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestNestedForeach {
- static MiniCluster cluster = MiniCluster.buildCluster();
+ private static PigServer pig ;
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
+
+ @Before
+ public void setup() throws IOException {
+ pig = new PigServer(cluster.getExecType(), properties);
+ }
+
+ @BeforeClass
+ public static void oneTimeSetup() {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ cluster.shutDown();
+ }
+
+ @Test
+ public void testNestedForeachProj() throws Exception {
+ String[] input = {
+ "1\t2",
+ "2\t7",
+ "1\t3"
+ };
+
+ Util.createInputFile(cluster, "table_nf_proj", input);
+
+ pig.registerQuery("a = load 'table_nf_proj' as (a0:int, a1:int);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { c1 = foreach a generate a1; generate c1; }\n");
- private PigServer pig ;
+ Iterator<Tuple> iter = pig.openIterator("c");
+ String[] expected = new String[] {"({(2),(3)})", "({(7)})"};
- public TestNestedForeach() throws Throwable {
- pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()) ;
- }
-
- Boolean[] nullFlags = new Boolean[]{ false, true };
-
- @AfterClass
- public static void oneTimeTearDown() throws Exception {
- cluster.shutDown();
- }
-
- @Test
- public void testNestedForeachProj() throws Exception {
- String[] input = {
- "1\t2",
- "2\t7",
- "1\t3"
- };
-
- Util.createInputFile(cluster, "table_nf_proj", input);
-
- pig.registerQuery("a = load 'table_nf_proj' as (a0:int, a1:int);\n");
- pig.registerQuery("b = group a by a0;\n");
- pig.registerQuery("c = foreach b { c1 = foreach a generate a1; generate c1; }\n");
-
- Iterator<Tuple> iter = pig.openIterator("c");
- String[] expected = new String[] {"({(2),(3)})", "({(7)})"};
-
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
-
- }
-
- @Test
- public void testNestedForeachExpression() throws Exception {
- String[] input = {
- "1\t2",
- "2\t7",
- "1\t3"
- };
-
- Util.createInputFile(cluster, "table_nf_expr", input);
-
- pig.registerQuery("a = load 'table_nf_expr' as (a0:int, a1:int);\n");
- pig.registerQuery("b = group a by a0;\n");
- pig.registerQuery("c = foreach b { c1 = foreach a generate 2 * a1; generate c1; }\n");
+ Util.checkQueryOutputsAfterSortRecursive(iter, expected,
+ org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
+
+ }
+
+ @Test
+ public void testNestedForeachExpression() throws Exception {
+ String[] input = {
+ "1\t2",
+ "2\t7",
+ "1\t3"
+ };
+
+ Util.createInputFile(cluster, "table_nf_expr", input);
+
+ pig.registerQuery("a = load 'table_nf_expr' as (a0:int, a1:int);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { c1 = foreach a generate 2 * a1; generate c1; }\n");
+
+ Iterator<Tuple> iter = pig.openIterator("c");
- Iterator<Tuple> iter = pig.openIterator("c");
-
String[] expected = new String[] {"({(4),(6)})", "({(14)})"};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
- }
+ Util.checkQueryOutputsAfterSortRecursive(iter, expected,
+ org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
+ }
- @Test
- public void testNestedForeachUDF() throws Exception {
- String[] input = {
- "1\thello",
- "2\tpig",
- "1\tworld"
- };
-
- Util.createInputFile(cluster, "table_nf_udf", input);
-
- pig.registerQuery("a = load 'table_nf_udf' as (a0:int, a1:chararray);\n");
- pig.registerQuery("b = group a by a0;\n");
- pig.registerQuery("c = foreach b { c1 = foreach a generate UPPER(a1); generate c1; }\n");
+ @Test
+ public void testNestedForeachUDF() throws Exception {
+ String[] input = {
+ "1\thello",
+ "2\tpig",
+ "1\tworld"
+ };
+
+ Util.createInputFile(cluster, "table_nf_udf", input);
+
+ pig.registerQuery("a = load 'table_nf_udf' as (a0:int, a1:chararray);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { c1 = foreach a generate UPPER(a1); generate c1; }\n");
+
+ Iterator<Tuple> iter = pig.openIterator("c");
- Iterator<Tuple> iter = pig.openIterator("c");
-
String[] expected = new String[] {"({(HELLO),(WORLD)})", "({(PIG)})"};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
- }
+ Util.checkQueryOutputsAfterSortRecursive(iter, expected,
+ org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
+ }
- @Test
- public void testNestedForeachFlatten() throws Exception {
- String[] input = {
- "1\thello world pig",
- "2\thadoop world",
- "1\thello pig"
- };
-
- Util.createInputFile(cluster, "table_nf_flatten", input);
-
- pig.registerQuery("a = load 'table_nf_flatten' as (a0:int, a1:chararray);\n");
- pig.registerQuery("b = group a by a0;\n");
- pig.registerQuery("c = foreach b { c1 = foreach a generate FLATTEN(TOKENIZE(a1)); generate c1; }\n");
-
- Iterator<Tuple> iter = pig.openIterator("c");
-
- String[] expected = new String[] {"({(hello),(world),(pig),(hello),(pig)})",
+ @Test
+ public void testNestedForeachFlatten() throws Exception {
+ String[] input = {
+ "1\thello world pig",
+ "2\thadoop world",
+ "1\thello pig"
+ };
+
+ Util.createInputFile(cluster, "table_nf_flatten", input);
+
+ pig.registerQuery("a = load 'table_nf_flatten' as (a0:int, a1:chararray);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { c1 = foreach a generate FLATTEN(TOKENIZE(a1)); generate c1; }\n");
+
+ Iterator<Tuple> iter = pig.openIterator("c");
+
+ String[] expected = new String[] {"({(hello),(world),(pig),(hello),(pig)})",
"({(hadoop),(world)})"};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
- }
+ Util.checkQueryOutputsAfterSortRecursive(iter, expected,
+ org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("c")));
+ }
+
+ @Test
+ public void testNestedForeachInnerFilter() throws Exception {
+ String[] input = {
+ "1\t2",
+ "2\t7",
+ "1\t3"
+ };
+
+ Util.createInputFile(cluster, "table_nf_filter", input);
+
+ pig.registerQuery("a = load 'table_nf_filter' as (a0:int, a1:int);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { " +
+ " c1 = filter a by a1 >= 3; " +
+ " c2 = foreach c1 generate a1; " +
+ " generate c2; " +
+ " }\n");
+
+ Iterator<Tuple> iter = pig.openIterator("c");
+ Tuple t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(3)})"));
+
+ t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(7)})"));
+ }
+
+ @Test
+ public void testNestedForeachInnerOrder() throws Exception {
+ String[] input = {
+ "1\t3",
+ "2\t7",
+ "1\t2"
+ };
+
+ Util.createInputFile(cluster, "table_nf_order", input);
+
+ pig.registerQuery("a = load 'table_nf_order' as (a0:int, a1:int);\n");
+ pig.registerQuery("b = group a by a0;\n");
+ pig.registerQuery("c = foreach b { " +
+ " c1 = order a by a1; " +
+ " c2 = foreach c1 generate a1; " +
+ " generate c2; " +
+ " }\n");
+
+ Iterator<Tuple> iter = pig.openIterator("c");
+ Tuple t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(2),(3)})"));
+
+ t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(7)})"));
+ }
- @Test
- public void testNestedForeachInnerFilter() throws Exception {
- String[] input = {
- "1\t2",
- "2\t7",
- "1\t3"
- };
-
- Util.createInputFile(cluster, "table_nf_filter", input);
-
- pig.registerQuery("a = load 'table_nf_filter' as (a0:int, a1:int);\n");
- pig.registerQuery("b = group a by a0;\n");
- pig.registerQuery("c = foreach b { " +
- " c1 = filter a by a1 >= 3; " +
- " c2 = foreach c1 generate a1; " +
- " generate c2; " +
- " }\n");
-
- Iterator<Tuple> iter = pig.openIterator("c");
- Tuple t = iter.next();
- Assert.assertTrue(t.toString().equals("({(3)})"));
-
- t = iter.next();
- Assert.assertTrue(t.toString().equals("({(7)})"));
- }
-
- @Test
- public void testNestedForeachInnerOrder() throws Exception {
- String[] input = {
- "1\t3",
- "2\t7",
- "1\t2"
- };
-
- Util.createInputFile(cluster, "table_nf_order", input);
-
- pig.registerQuery("a = load 'table_nf_order' as (a0:int, a1:int);\n");
- pig.registerQuery("b = group a by a0;\n");
- pig.registerQuery("c = foreach b { " +
- " c1 = order a by a1; " +
- " c2 = foreach c1 generate a1; " +
- " generate c2; " +
- " }\n");
-
- Iterator<Tuple> iter = pig.openIterator("c");
- Tuple t = iter.next();
- Assert.assertTrue(t.toString().equals("({(2),(3)})"));
-
- t = iter.next();
- Assert.assertTrue(t.toString().equals("({(7)})"));
- }
-
- // See PIG-2563
- @Test
+ // See PIG-2563
+ @Test
public void testNestedForeach() throws Exception {
String[] input = {
"1\t2\t3",
Modified: pig/branches/tez/test/org/apache/pig/test/TestPigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestPigContext.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestPigContext.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestPigContext.java Sat Mar 1 07:22:25 2014
@@ -34,6 +34,8 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.util.JavaCompilerHelper;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -45,13 +47,16 @@ public class TestPigContext {
private static final String FS_NAME = "file:///";
private static final String JOB_TRACKER = "local";
+ private static PigContext pigContext;
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
+
private File input;
- private PigContext pigContext;
- static MiniCluster cluster = null;
@BeforeClass
public static void oneTimeSetup() {
- cluster = MiniCluster.buildCluster();
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
}
@Before
@@ -60,6 +65,11 @@ public class TestPigContext {
input = File.createTempFile("PigContextTest-", ".txt");
}
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ cluster.shutDown();
+ }
+
/**
* Passing an already configured pigContext in PigServer constructor.
*/
@@ -142,8 +152,7 @@ public class TestPigContext {
int status = Util.executeJavaCommand("jar -cf " + jarFile +
" -C " + tmpDir.getAbsolutePath() + " " + "com");
assertEquals(0, status);
- Properties properties = cluster.getProperties();
- PigContext localPigContext = new PigContext(ExecType.MAPREDUCE, properties);
+ PigContext localPigContext = new PigContext(cluster.getExecType(), properties);
// register jar using properties
localPigContext.getProperties().setProperty("pig.additional.jars", jarFile);
@@ -216,16 +225,6 @@ public class TestPigContext {
pc.getScriptFiles().remove("test/path-1824");
}
- @After
- public void tearDown() throws Exception {
- input.delete();
- }
-
- @AfterClass
- public static void oneTimeTearDown() throws Exception {
- cluster.shutDown();
- }
-
private static Properties getProperties() {
Properties props = new Properties();
props.put("mapred.job.tracker", JOB_TRACKER);
Modified: pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestPigServer.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestPigServer.java Sat Mar 1 07:22:25 2014
@@ -72,9 +72,12 @@ import org.apache.pig.impl.util.Properti
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.grunt.Grunt;
import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
@@ -83,15 +86,14 @@ import org.w3c.dom.NodeList;
import com.google.common.io.Files;
public class TestPigServer {
- private PigServer pig = null;
- static MiniCluster cluster = MiniCluster.buildCluster();
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
+
private File tempDir;
@Before
public void setUp() throws Exception{
FileLocalizer.setInitialized(false);
- pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-
tempDir = Files.createTempDir();
tempDir.deleteOnExit();
registerNewResource(tempDir.getAbsolutePath());
@@ -99,10 +101,15 @@ public class TestPigServer {
@After
public void tearDown() throws Exception{
- pig = null;
tempDir.delete();
}
+ @BeforeClass
+ public static void oneTimeSetup() {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ }
+
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
@@ -119,9 +126,9 @@ public class TestPigServer {
if (url.toString().contains(name)) {
if (!included) {
fail("Included is false, but url ["+url+"] contains name ["+name+"]");
- }
+ }
assertEquals("Too many urls contain name: " + name, 1, ++count);
- }
+ }
}
if (included) {
assertEquals("Number of urls that contain name [" + name + "] != 1", 1, count);
@@ -150,8 +157,7 @@ public class TestPigServer {
URL urlToAdd = new File(file).toURI().toURL();
URLClassLoader sysLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
Method addMethod = URLClassLoader.class.
- getDeclaredMethod("addURL",
- new Class[]{URL.class});
+ getDeclaredMethod("addURL", new Class[]{URL.class});
addMethod.setAccessible(true);
addMethod.invoke(sysLoader, new Object[]{urlToAdd});
}
@@ -166,6 +172,7 @@ public class TestPigServer {
String jarName = "BadFileNameTestJarNotPresent.jar";
// jar name is not present to start with
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
verifyStringContained(pig.getPigContext().extraJars, jarName, false);
boolean raisedException = false;
try {
@@ -192,9 +199,10 @@ public class TestPigServer {
createFakeJarFile(jarLocation, jarName);
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
verifyStringContained(pig.getPigContext().extraJars, jarName, false);
- pig.registerJar(jarLocation + jarName);
+ pig.registerJar(jarLocation + jarName);
verifyStringContained(pig.getPigContext().extraJars, jarName, true);
// clean-up
@@ -221,12 +229,13 @@ public class TestPigServer {
createFakeJarFile(jarLocation1, jarName);
createFakeJarFile(jarLocation2, jarName);
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
verifyStringContained(pig.getPigContext().extraJars, jarName, false);
registerNewResource(jarLocation1);
registerNewResource(jarLocation2);
- pig.registerJar(jarName);
+ pig.registerJar(jarName);
verifyStringContained(pig.getPigContext().extraJars, jarName, true);
// clean-up
@@ -288,6 +297,7 @@ public class TestPigServer {
// load the specific resource
boolean exceptionRaised = false;
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
try {
pig.registerJar("sub_dir/TestRegisterJar.class");
}
@@ -314,7 +324,8 @@ public class TestPigServer {
createFakeJarFile(jarLocation, jar1Name);
createFakeJarFile(jarLocation, jar2Name);
- pig.registerJar(jarLocation + "TestRegisterJarGlobbing*.jar");
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+ pig.registerJar(jarLocation + "TestRegisterJarGlobbing*.jar");
verifyStringContained(pig.getPigContext().extraJars, jar1Name, true);
verifyStringContained(pig.getPigContext().extraJars, jar2Name, true);
@@ -335,7 +346,8 @@ public class TestPigServer {
createFakeJarFile(jarLocation, jar2Name);
String currentDir = System.getProperty("user.dir");
- pig.registerJar(new File(currentDir, dir) + FILE_SEPARATOR + "TestRegisterJarGlobbing*.jar");
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+ pig.registerJar(new File(currentDir, dir) + FILE_SEPARATOR + "TestRegisterJarGlobbing*.jar");
verifyStringContained(pig.getPigContext().extraJars, jar1Name, true);
verifyStringContained(pig.getPigContext().extraJars, jar2Name, true);
@@ -360,7 +372,8 @@ public class TestPigServer {
// depend on configuration
String absPath = fs.getFileStatus(new Path(jarLocation)).getPath().toString();
- pig.registerJar(absPath + FILE_SEPARATOR + "TestRegister{Remote}Jar*.jar");
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+ pig.registerJar(absPath + FILE_SEPARATOR + "TestRegister{Remote}Jar*.jar");
verifyStringContained(pig.getPigContext().extraJars, jar1Name, true);
verifyStringContained(pig.getPigContext().extraJars, jar2Name, true);
@@ -370,6 +383,7 @@ public class TestPigServer {
}
public void testDescribeLoad() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
Schema dumpedSchema = pig.dumpSchema("a") ;
Schema expectedSchema = Utils.getSchemaFromString("field1: int,field2: float,field3: chararray");
@@ -378,6 +392,7 @@ public class TestPigServer {
@Test
public void testDescribeFilter() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = filter a by field1 > 10;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -387,6 +402,7 @@ public class TestPigServer {
@Test
public void testDescribeDistinct() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = distinct a ;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -396,6 +412,7 @@ public class TestPigServer {
@Test
public void testDescribeSort() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = order a by * desc;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -405,6 +422,7 @@ public class TestPigServer {
@Test
public void testDescribeLimit() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = limit a 10;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -414,6 +432,7 @@ public class TestPigServer {
@Test
public void testDescribeForeach() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = foreach a generate field1 + 10;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -423,7 +442,7 @@ public class TestPigServer {
@Test
public void testDescribeForeachFail() throws Throwable {
-
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = foreach a generate field1 + 10;") ;
try {
@@ -436,6 +455,7 @@ public class TestPigServer {
@Test
public void testDescribeForeachNoSchema() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' ;") ;
pig.registerQuery("b = foreach a generate *;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -444,6 +464,7 @@ public class TestPigServer {
@Test
public void testDescribeCogroup() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
pig.registerQuery("c = cogroup a by field1, b by field4;") ;
@@ -454,6 +475,7 @@ public class TestPigServer {
@Test
public void testDescribeCross() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
pig.registerQuery("c = cross a, b;") ;
@@ -464,6 +486,7 @@ public class TestPigServer {
@Test
public void testDescribeJoin() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
pig.registerQuery("c = join a by field1, b by field4;") ;
@@ -474,6 +497,7 @@ public class TestPigServer {
@Test
public void testDescribeUnion() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
pig.registerQuery("c = union a, b;") ;
@@ -484,6 +508,7 @@ public class TestPigServer {
@Test
public void testDescribeTuple2Elem() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: int, field3: int );") ;
pig.registerQuery("b = foreach a generate field1, (field2, field3);") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -493,6 +518,7 @@ public class TestPigServer {
@Test
public void testDescribeComplex() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (site: chararray, count: int, itemCounts: bag { itemCountsTuple: tuple (type: chararray, typeCount: int, f: float, m: map[]) } ) ;") ;
pig.registerQuery("b = foreach a generate site, count, FLATTEN(itemCounts);") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
@@ -504,6 +530,7 @@ public class TestPigServer {
}
private void registerScalarScript(boolean useScalar, String expectedSchemaStr) throws IOException {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("A = load 'adata' AS (a: int, b: int);");
//scalar
pig.registerQuery("C = FOREACH A GENERATE *;");
@@ -533,6 +560,12 @@ public class TestPigServer {
@Test
public void testExplainXmlComplex() throws Throwable {
+ // TODO: Explain XML output is not supported in non-MR mode. Remove the
+ // following condition once it's implemented in Tez.
+ if (cluster.getExecType() != ExecType.MAPREDUCE) {
+ return;
+ }
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (site: chararray, count: int, itemCounts: bag { itemCountsTuple: tuple (type: chararray, typeCount: int, f: float, m: map[]) } ) ;") ;
pig.registerQuery("b = foreach a generate site, count, FLATTEN(itemCounts);") ;
pig.registerQuery("c = group b by site;");
@@ -614,6 +647,7 @@ public class TestPigServer {
String absPath = fs.getFileStatus(new Path(scriptName)).getPath().toString();
Util.createInputFile(cluster, "testRegisterRemoteScript_input", new String[]{"1", "2"});
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerCode(absPath, "jython", "pig");
pig.registerQuery("a = load 'testRegisterRemoteScript_input';");
pig.registerQuery("b = foreach a generate pig.helloworld($0);");
@@ -631,26 +665,6 @@ public class TestPigServer {
assertFalse(iter.hasNext());
}
-
- // PIG-3469
- @Test
- public void testNonExistingSecondDirectoryInSkewJoin() throws Exception {
- String script =
- "exists = LOAD 'test/org/apache/pig/test/data/InputFiles/jsTst1.txt' AS (x:chararray, a:long);" +
- "missing = LOAD '/non/existing/directory' AS (a:long);" +
- "missing = FOREACH ( GROUP missing BY a ) GENERATE $0 AS a, COUNT_STAR($1);" +
- "joined = JOIN exists BY a, missing BY a USING 'skewed';" +
- "STORE joined INTO '/tmp/test_out.tsv';";
-
- PigServer pig = new PigServer(ExecType.LOCAL);
- // Execution of the script should fail, but without throwing any exceptions (such as NPE)
- try {
- pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")));
- } catch(Exception ex) {
- fail("Unexpected exception: " + ex);
- }
- }
-
@Test
public void testParamSubstitution() throws Exception{
// using params map
@@ -798,6 +812,7 @@ public class TestPigServer {
@Test
public void testDescribeForEachFlatten() throws Throwable {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a';") ;
pig.registerQuery("b = group a by $0;") ;
pig.registerQuery("c = foreach b generate flatten(a);") ;
@@ -807,7 +822,8 @@ public class TestPigServer {
@Test // PIG-2059
public void test1() throws Throwable {
- pig.setValidateEachStatement(true);
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+ pig.setValidateEachStatement(true);
pig.registerQuery("A = load 'x' as (u, v);") ;
try {
pig.registerQuery("B = foreach A generate $2;") ;
@@ -820,40 +836,40 @@ public class TestPigServer {
}
@Test
- public void testDefaultPigProperties() throws Throwable {
- //Test with PigServer
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE);
- Properties properties = pigServer.getPigContext().getProperties();
+ public void testDefaultPigProperties() throws Throwable {
+ //Test with PigServer
+ PigServer pigServer = new PigServer(cluster.getExecType());
+ Properties properties = pigServer.getPigContext().getProperties();
assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
assertEquals("true", properties.getProperty("aggregate.warning"));
assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
assertEquals("false", properties.getProperty("stop.on.failure"));
- //Test with properties file
- File propertyFile = new File(tempDir, "pig.properties");
+ //Test with properties file
+ File propertyFile = new File(tempDir, "pig.properties");
- properties = PropertiesUtil.loadDefaultProperties();
+ properties = PropertiesUtil.loadDefaultProperties();
- assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
+ assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
assertEquals("true", properties.getProperty("aggregate.warning"));
assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
assertEquals("false", properties.getProperty("stop.on.failure"));
- PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
- out.println("aggregate.warning=false");
- out.println("opt.multiquery=false");
- out.println("stop.on.failure=true");
-
- out.close();
-
- properties = PropertiesUtil.loadDefaultProperties();
- assertEquals("false", properties.getProperty("aggregate.warning"));
- assertEquals("false", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
- assertEquals("true", properties.getProperty("stop.on.failure"));
+ PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
+ out.println("aggregate.warning=false");
+ out.println("opt.multiquery=false");
+ out.println("stop.on.failure=true");
+
+ out.close();
- propertyFile.delete();
- }
+ properties = PropertiesUtil.loadDefaultProperties();
+ assertEquals("false", properties.getProperty("aggregate.warning"));
+ assertEquals("false", properties.getProperty(PigConfiguration.OPT_MULTIQUERY));
+ assertEquals("true", properties.getProperty("stop.on.failure"));
+
+ propertyFile.delete();
+ }
@Test
public void testSecondarySort() throws Exception {
Modified: pig/branches/tez/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestPigStorage.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestPigStorage.java Sat Mar 1 07:22:25 2014
@@ -18,7 +18,6 @@
package org.apache.pig.test;
-import static org.apache.pig.ExecType.MAPREDUCE;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.*;
@@ -26,16 +25,13 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
@@ -52,6 +48,8 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.test.utils.TypeCheckingTestUtil;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.ScriptState;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
@@ -59,18 +57,15 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestPigStorage {
-
- protected final Log log = LogFactory.getLog(getClass());
-
- private static MiniCluster cluster = MiniCluster.buildCluster();
- static PigServer pig;
- static final String datadir = "build/test/tmpdata/";
-
- PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
- Map<String, String> fileNameMap = new HashMap<String, String>();
+ private static PigServer pig;
+ private static PigContext pigContext;
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
+ private static final String datadir = "build/test/tmpdata/";
@Before
public void setup() throws IOException {
@@ -99,6 +94,13 @@ public class TestPigStorage {
pig.shutdown();
}
+ @BeforeClass
+ public static void oneTimeSetup() {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ pigContext = new PigContext(ExecType.LOCAL, new Properties());
+ }
+
@AfterClass
public static void shutdown() {
cluster.shutDown();
@@ -120,11 +122,11 @@ public class TestPigStorage {
// This tests PigStorage loader with records exactly
// on the boundary of the file blocks.
Properties props = new Properties();
- for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
+ for (Entry<Object, Object> entry : properties.entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
props.setProperty("mapred.max.split.size", "20");
- PigServer pigServer = new PigServer(MAPREDUCE, props);
+ PigServer pigServer = new PigServer(cluster.getExecType(), props);
String[] inputs = {
"abcdefgh1", "abcdefgh2", "abcdefgh3",
"abcdefgh4", "abcdefgh5", "abcdefgh6",
@@ -191,7 +193,7 @@ public class TestPigStorage {
inputFileName,
new String[] {"1\t2\t3", "4", "5\t6\t7"});
String script = "a = load '" + inputFileName + "' as (i:int, j:int, k:int);" +
- "b = foreach a generate j, k;";
+ "b = foreach a generate j, k;";
Util.registerMultiLineQuery(pig, script);
Iterator<Tuple> it = pig.openIterator("b");
assertEquals(Util.createTuple(new Integer[] { 2, 3}), it.next());
@@ -261,9 +263,9 @@ public class TestPigStorage {
"as (f1:chararray, f2:int);";
pig.registerQuery(query);
pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
-
+
// aout now has a schema.
-
+
// Verify that loaded data has the correct data type after the prune
pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t'); c = FOREACH b GENERATE f2;");
@@ -521,40 +523,41 @@ public class TestPigStorage {
pig.mkdirs(globtestdir+"b");
} catch (IOException e) {};
+ Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
// if schema file is not found, schema is null
- ResourceSchema schema = pigStorage.getSchema(globtestdir, new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ ResourceSchema schema = pigStorage.getSchema(globtestdir, new Job(conf));
Assert.assertTrue(schema==null);
// if .pig_schema is in the input directory
putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
- schema = pigStorage.getSchema(globtestdir+"a/a0", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ schema = pigStorage.getSchema(globtestdir+"a/a0", new Job(conf));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"a/a0/.pig_schema").delete();
// .pig_schema in one of globStatus returned directory
putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
- schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"a/.pig_schema").delete();
putSchemaFile(globtestdir+"b/.pig_schema", testSchema);
- schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"b/.pig_schema").delete();
// if .pig_schema is deep in the globbing, it will not get used
putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
- schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
Assert.assertTrue(schema==null);
putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
- schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ schema = pigStorage.getSchema(globtestdir+"*", new Job(conf));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"a/a0/.pig_schema").delete();
new File(globtestdir+"a/.pig_schema").delete();
pigStorage = new PigStorage("\t", "-schema");
putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
- schema = pigStorage.getSchema(globtestdir+"{a,b}", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ schema = pigStorage.getSchema(globtestdir+"{a,b}", new Job(conf));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
}
Modified: pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java Sat Mar 1 07:22:25 2014
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertTru
import static org.junit.Assert.fail;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
@@ -35,6 +37,7 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.data.BagFactory;
@@ -44,6 +47,7 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.test.utils.TestHelper;
import org.junit.AfterClass;
import org.junit.Before;
@@ -95,7 +99,7 @@ public class TestSkewedJoin {
int k = 0;
for(int j=0; j<120; j++) {
- w.println("100\tapple1\taaa" + k);
+ w.println("100\tapple1\taaa" + k);
k++;
w.println("200\torange1\tbbb" + k);
k++;
@@ -532,4 +536,61 @@ public class TestSkewedJoin {
}
}
}
+
+ // PIG-3469
+ // This query should fail with nothing else but InvalidInputException
+ @Test
+ public void testNonExistingInputPathInSkewJoin() throws Exception {
+ String script =
+ "exists = LOAD '" + INPUT_FILE2 + "' AS (a:long, x:chararray);" +
+ "missing = LOAD '/non/existing/directory' AS (a:long);" +
+ "missing = FOREACH ( GROUP missing BY a ) GENERATE $0 AS a, COUNT_STAR($1);" +
+ "joined = JOIN exists BY a, missing BY a USING 'skewed';";
+
+ String logFile = Util.createTempFileDelOnExit("tmp", ".log").getAbsolutePath();
+ String oldValue = (String) properties.setProperty("pig.logfile", logFile);
+
+ try {
+ pigServer.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")));
+ pigServer.openIterator("joined");
+ } catch (Exception e) {
+ boolean foundInvalidInputException = false;
+
+ // Search through chained exceptions for InvalidInputException. If
+ // input splits are calculated on the front-end, we will see this
+ // exception in the stack trace.
+ Throwable cause = e.getCause();
+ while (cause != null) {
+ if (cause instanceof InvalidInputException) {
+ foundInvalidInputException = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ // InvalidInputException was not found in the stack trace. But it's
+ // possible that the exception was thrown in the back-end, and Pig
+ // couldn't retrieve it in the front-end. To be safe, search the log
+ // file before declaring a failure.
+ if (!foundInvalidInputException) {
+ FileInputStream fis = new FileInputStream(new File(logFile));
+ int bytes = fis.available();
+ byte[] buffer = new byte[bytes];
+ fis.read(buffer);
+ String str = new String(buffer, "UTF-8");
+ if (str.contains(InvalidInputException.class.getName())) {
+ foundInvalidInputException = true;
+ }
+ fis.close();
+ }
+
+ assertTrue("This exception was not caused by InvalidInputException: " + e,
+ foundInvalidInputException);
+ } finally {
+ if (oldValue != null) {
+ properties.setProperty("pig.logfile", oldValue);
+ }
+ }
+ }
+
}
Modified: pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java Sat Mar 1 07:22:25 2014
@@ -23,7 +23,6 @@ import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Properties;
-import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
Modified: pig/branches/tez/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/tez-tests?rev=1573129&r1=1573128&r2=1573129&view=diff
==============================================================================
--- pig/branches/tez/test/tez-tests (original)
+++ pig/branches/tez/test/tez-tests Sat Mar 1 07:22:25 2014
@@ -6,5 +6,10 @@
**/TestSkewedJoin.java
**/TestSplitStore.java
**/TestCustomPartitioner.java
+**/TestPigContext.java
+**/TestPigStorage.java
+**/TestNestedForeach.java
+**/TestEvalPipeline.java
+**/TestPigServer.java
## TODO: Runs fine individually. Hangs with file.out.index not found when run together. Likely Tez Bug
##**/TestSecondarySortTez.java