You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC
svn commit: r1598702 [17/23] - in /pig/trunk: ./ ivy/
shims/src/hadoop23/org/apache/pig/backend/hadoop23/
shims/test/hadoop20/org/apache/pig/test/
shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/
src/org/apache/pig/ src/org/apache/pig/ba...
Added: pig/trunk/test/org/apache/pig/test/TestCustomPartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCustomPartitioner.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCustomPartitioner.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestCustomPartitioner.java Fri May 30 19:07:23 2014
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCustomPartitioner {
+ private static MiniGenericCluster cluster;
+ private static Properties properties;
+ private static PigServer pigServer;
+ private static FileSystem fs;
+
+ TupleFactory mTf = TupleFactory.getInstance();
+ BagFactory mBf = BagFactory.getInstance();
+
+ @Before
+ public void setUp() throws Exception {
+ FileLocalizer.setR(new Random());
+ pigServer = new PigServer(cluster.getExecType(), properties);
+ }
+
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ fs = cluster.getFileSystem();
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ cluster.shutDown();
+ }
+
+ // See PIG-282
+ @Test
+ public void testCustomPartitionerParseJoins() throws Exception{
+ String[] input = {
+ "1\t3",
+ "1\t2"
+ };
+ Util.createInputFile(cluster, "table_testCustomPartitionerParseJoins", input);
+
+ // Custom Partitioner is not allowed for skewed joins, will throw a ExecException
+ try {
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerParseJoins' as (a0:int, a1:int);");
+ pigServer.registerQuery("B = ORDER A by $0;");
+ pigServer.registerQuery("skewed = JOIN A by $0, B by $0 USING 'skewed' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+ //control should not reach here
+ Assert.fail("Skewed join cannot accept a custom partitioner");
+ } catch(FrontendException e) {
+ Assert.assertTrue( e.getMessage().contains( "Custom Partitioner is not supported for skewed join" ) );
+ }
+
+ pigServer.registerQuery("hash = JOIN A by $0, B by $0 USING 'hash' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+ Iterator<Tuple> iter = pigServer.openIterator("hash");
+
+ List<String> expected = new ArrayList<String>();
+ expected.add("(1,3,1,2)");
+ expected.add("(1,3,1,3)");
+ expected.add("(1,2,1,2)");
+ expected.add("(1,2,1,3)");
+ Collections.sort(expected);
+
+ List<String> actual = new ArrayList<String>();
+ while (iter.hasNext()) {
+ actual.add(iter.next().toString());
+ }
+ Collections.sort(actual);
+
+ Assert.assertEquals(expected, actual);
+
+ // No checks are made for merged and replicated joins as they are compiled to a map only job
+ // No frontend error checking has been added for these jobs, hence not adding any test cases
+ // Manually tested the sanity once. Above test should cover the basic sanity of the scenario
+
+ Util.deleteFile(cluster, "table_testCustomPartitionerParseJoins");
+ }
+
+ // See PIG-282
+ @Test
+ public void testCustomPartitionerGroups() throws Exception{
+ String[] input = {
+ "1\t1",
+ "2\t1",
+ "3\t1",
+ "4\t1"
+ };
+ Util.createInputFile(cluster, "table_testCustomPartitionerGroups", input);
+
+ String outputDir = "tmp_testCustomPartitionerGroup";
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerGroups' as (a0:int, a1:int);");
+ // It should be noted that for a map reduce job, the total number of partitions
+ // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein
+ // we will get more than one reduce job so that we can use the partitioner.
+ // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
+ // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
+ // partition number is bigger than 1.
+ //
+ pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+ pigServer.store("B", outputDir);
+
+ new File(outputDir).mkdir();
+ FileStatus[] outputFiles = fs.listStatus(new Path(outputDir), Util.getSuccessMarkerPathFilter());
+
+ Util.copyFromClusterToLocal(cluster, outputFiles[0].getPath().toString(), outputDir + "/" + 0);
+ BufferedReader reader = new BufferedReader(new FileReader(outputDir + "/" + 0));
+ while(reader.readLine() != null) {
+ Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
+ }
+ reader.close();
+
+ Util.copyFromClusterToLocal(cluster, outputFiles[1].getPath().toString(), outputDir + "/" + 1);
+ reader = new BufferedReader(new FileReader(outputDir + "/" + 1));
+ int count=0;
+ while(reader.readLine() != null) {
+ //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+ count++;
+ }
+ reader.close();
+ Assert.assertEquals(4, count);
+
+ Util.deleteDirectory(new File(outputDir));
+ Util.deleteFile(cluster, outputDir);
+ Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
+ }
+
+ // See PIG-3385
+ @Test
+ public void testCustomPartitionerDistinct() throws Exception{
+ String[] input = {
+ "1\t1",
+ "2\t1",
+ "1\t1",
+ "3\t1",
+ "4\t1",
+ };
+ Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
+
+ String outputDir = "tmp_testCustomPartitionerDistinct";
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
+ pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+ pigServer.store("B", outputDir);
+
+ new File(outputDir).mkdir();
+ FileStatus[] outputFiles = fs.listStatus(new Path(outputDir), Util.getSuccessMarkerPathFilter());
+
+ // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
+ Util.copyFromClusterToLocal(cluster, outputFiles[0].getPath().toString(), outputDir + "/" + 0);
+ BufferedReader reader = new BufferedReader(new FileReader(outputDir + "/" + 0));
+ while (reader.readLine() != null) {
+ Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
+ }
+ reader.close();
+
+ Util.copyFromClusterToLocal(cluster, outputFiles[1].getPath().toString(), outputDir + "/" + 1);
+ reader = new BufferedReader(new FileReader(outputDir + "/" + 1));
+ int count=0;
+ while (reader.readLine() != null) {
+ //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+ count++;
+ }
+ reader.close();
+ Assert.assertEquals(4, count);
+
+ Util.deleteDirectory(new File(outputDir));
+ Util.deleteFile(cluster, outputDir);
+ Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
+ }
+
+ // See PIG-282
+ @Test
+ public void testCustomPartitionerCross() throws Exception{
+ String[] input = {
+ "1\t3",
+ "1\t2",
+ };
+
+ Util.createInputFile(cluster, "table_testCustomPartitionerCross", input);
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerCross' as (a0:int, a1:int);");
+ pigServer.registerQuery("B = ORDER A by $0;");
+ pigServer.registerQuery("C = cross A , B PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ Tuple t;
+
+ Collection<String> results = new HashSet<String>();
+ results.add("(1,3,1,2)");
+ results.add("(1,3,1,3)");
+ results.add("(1,2,1,2)");
+ results.add("(1,2,1,3)");
+
+ Assert.assertTrue(iter.hasNext());
+ t = iter.next();
+ Assert.assertTrue(t.size()==4);
+ Assert.assertTrue(results.contains(t.toString()));
+
+ Assert.assertTrue(iter.hasNext());
+ t = iter.next();
+ Assert.assertTrue(t.size()==4);
+ Assert.assertTrue(results.contains(t.toString()));
+
+ Assert.assertTrue(iter.hasNext());
+ t = iter.next();
+ Assert.assertTrue(t.size()==4);
+ Assert.assertTrue(results.contains(t.toString()));
+
+ Assert.assertTrue(iter.hasNext());
+ t = iter.next();
+ Assert.assertTrue(t.size()==4);
+ Assert.assertTrue(results.contains(t.toString()));
+
+ Util.deleteFile(cluster, "table_testCustomPartitionerCross");
+ }
+}
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Fri May 30 19:07:23 2014
@@ -28,14 +28,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Random;
import java.util.StringTokenizer;
-import junit.framework.Assert;
-
-import org.apache.pig.ComparisonFunc;
import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
@@ -56,32 +53,37 @@ import org.apache.pig.impl.util.Pair;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.Identity;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-@RunWith(JUnit4.class)
public class TestEvalPipeline {
-
- static MiniCluster cluster = MiniCluster.buildCluster();
- private PigServer pigServer;
- private PigContext pigContext;
-
- TupleFactory mTf = TupleFactory.getInstance();
- BagFactory mBf = BagFactory.getInstance();
-
+ private static PigServer pigServer;
+ private static PigContext pigContext;
+ private static Properties properties;
+ private static MiniGenericCluster cluster;
+
+ private TupleFactory mTf = TupleFactory.getInstance();
+ private BagFactory mBf = BagFactory.getInstance();
+
@Before
public void setUp() throws Exception{
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer = new PigServer(cluster.getExecType(), properties);
pigContext = pigServer.getPigContext();
}
-
+
+ @BeforeClass
+ public static void oneTimeSetup() {
+ cluster = MiniGenericCluster.buildCluster();
+ properties = cluster.getProperties();
+ }
+
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
-
+
static public class MyBagFunction extends EvalFunc<DataBag>{
@Override
public DataBag exec(Tuple input) throws IOException {
@@ -93,35 +95,35 @@ public class TestEvalPipeline {
return output;
}
}
-
+
@Test
public void testFunctionInsideFunction() throws Exception{
File f1 = Util.createFile(new String[]{"a:1","b:1","a:1"});
- pigServer.registerQuery("a = load '"
- + Util.generateURI(f1.toString(), pigContext)
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(f1.toString(), pigContext)
+ "' using " + PigStorage.class.getName() + "(':');");
pigServer.registerQuery("b = foreach a generate 1-1/1;");
Iterator<Tuple> iter = pigServer.openIterator("b");
-
+
for (int i=0 ;i<3; i++){
- Assert.assertEquals(DataType.toDouble(iter.next().get(0)), 0.0);
+ Assert.assertEquals(0.0d, DataType.toDouble(iter.next().get(0)).doubleValue(), 0.0d);
}
}
-
+
@Test
public void testJoin() throws Exception{
File f1 = Util.createFile(new String[]{"a:1","b:1","a:1"});
File f2 = Util.createFile(new String[]{"b","b","a"});
-
- pigServer.registerQuery("a = load '"
- + Util.generateURI(f1.toString(), pigContext) + "' using "
+
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(f1.toString(), pigContext) + "' using "
+ PigStorage.class.getName() + "(':');");
- pigServer.registerQuery("b = load '"
+ pigServer.registerQuery("b = load '"
+ Util.generateURI(f2.toString(), pigContext) + "';");
- pigServer.registerQuery("c = cogroup a by $0, b by $0;");
+ pigServer.registerQuery("c = cogroup a by $0, b by $0;");
pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);");
-
+
Iterator<Tuple> iter = pigServer.openIterator("d");
int count = 0;
while(iter.hasNext()){
@@ -131,7 +133,7 @@ public class TestEvalPipeline {
}
Assert.assertEquals(count, 4);
}
-
+
@Test
public void testDriverMethod() throws Exception{
File f = Util.createTempFileDelOnExit("tmp", "");
@@ -139,8 +141,8 @@ public class TestEvalPipeline {
pw.println("a");
pw.println("a");
pw.close();
- pigServer.registerQuery("a = foreach (load '"
- + Util.generateURI(f.toString(), pigContext) + "') "
+ pigServer.registerQuery("a = foreach (load '"
+ + Util.generateURI(f.toString(), pigContext) + "') "
+ "generate 1, flatten(" + MyBagFunction.class.getName() + "(*));");
Iterator<Tuple> iter = pigServer.openIterator("a");
int count = 0;
@@ -153,61 +155,61 @@ public class TestEvalPipeline {
Assert.assertEquals(count, 6);
f.delete();
}
-
+
@Test
public void testMapLookup() throws Exception {
DataBag b = BagFactory.getInstance().newDefaultBag();
Map<String, Object> colors = new HashMap<String, Object>();
colors.put("apple","red");
colors.put("orange","orange");
-
+
Map<String, Object> weights = new HashMap<String, Object>();
weights.put("apple","0.1");
weights.put("orange","0.3");
-
+
Tuple t = mTf.newTuple();
t.append(colors);
t.append(weights);
b.add(t);
-
+
File tmpFile = File.createTempFile("tmp", "");
tmpFile.delete(); // we only needed the temp file name, so delete the file
String fileName = Util.removeColon(tmpFile.getAbsolutePath());
PigFile f = new PigFile(fileName);
f.store(b, new FuncSpec(BinStorage.class.getCanonicalName()),
- pigServer.getPigContext());
-
+ pigServer.getPigContext());
+
pigServer.registerQuery("a = load '" + Util.encodeEscape(fileName) + "' using BinStorage();");
pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');");
Iterator<Tuple> iter = pigServer.openIterator("b");
t = iter.next();
Assert.assertEquals(t.get(0).toString(), "red");
- Assert.assertEquals(DataType.toDouble(t.get(1)), 0.3);
+ Assert.assertEquals(0.3d, DataType.toDouble(t.get(1)).doubleValue(), 0.0d);
Assert.assertFalse(iter.hasNext());
Util.deleteFile(cluster, fileName);
}
-
+
static public class TitleNGrams extends EvalFunc<DataBag> {
-
+
@Override
- public DataBag exec(Tuple input) throws IOException {
+ public DataBag exec(Tuple input) throws IOException {
try {
DataBag output = BagFactory.getInstance().newDefaultBag();
String str = input.get(0).toString();
-
+
String title = str;
if (title != null) {
List<String> nGrams = makeNGrams(title);
-
+
for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) {
Tuple t = TupleFactory.getInstance().newTuple(1);
t.set(0, it.next());
output.add(t);
}
}
-
+
return output;
} catch (ExecException ee) {
IOException ioe = new IOException(ee.getMessage());
@@ -215,28 +217,28 @@ public class TestEvalPipeline {
throw ioe;
}
}
-
-
+
+
List<String> makeNGrams(String str) {
List<String> tokens = new ArrayList<String>();
-
+
StringTokenizer st = new StringTokenizer(str);
while (st.hasMoreTokens())
tokens.add(st.nextToken());
-
+
return nGramHelper(tokens, new ArrayList<String>());
}
-
+
ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) {
if (str.size() == 0)
return nGrams;
-
+
for (int i = 0; i < str.size(); i++)
nGrams.add(makeString(str.subList(0, i+1)));
-
+
return nGramHelper(str.subList(1, str.size()), nGrams);
}
-
+
String makeString(List<String> list) {
StringBuffer sb = new StringBuffer();
for (Iterator<String> it = list.iterator(); it.hasNext(); ) {
@@ -288,18 +290,18 @@ public class TestEvalPipeline {
myMap.put("map", mapInMap);
myMap.put("tuple", tuple);
myMap.put("bag", bag);
- return myMap;
+ return myMap;
}
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.MAP));
}
}
-
+
@Test
public void testBagFunctionWithFlattening() throws Exception{
File queryLogFile = Util.createFile(
- new String[]{
+ new String[]{
"stanford\tdeer\tsighting",
"bush\tpresident",
"stanford\tbush",
@@ -309,66 +311,52 @@ public class TestEvalPipeline {
"stanford\tpresident",
}
);
-
+
File newsFile = Util.createFile(
new String[]{
"deer seen at stanford",
- "george bush visits stanford",
- "yahoo hosting a conference in the bay area",
+ "george bush visits stanford",
+ "yahoo hosting a conference in the bay area",
"who will win the world cup"
}
- );
-
+ );
+
Map<String, Integer> expectedResults = new HashMap<String, Integer>();
expectedResults.put("bush", 2);
expectedResults.put("stanford", 3);
expectedResults.put("world", 1);
expectedResults.put("conference", 1);
-
- pigServer.registerQuery("newsArticles = LOAD '"
- + Util.generateURI(newsFile.toString(), pigContext)
+
+ pigServer.registerQuery("newsArticles = LOAD '"
+ + Util.generateURI(newsFile.toString(), pigContext)
+ "' USING " + TextLoader.class.getName() + "();");
- pigServer.registerQuery("queryLog = LOAD '"
+ pigServer.registerQuery("queryLog = LOAD '"
+ Util.generateURI(queryLogFile.toString(), pigContext) + "';");
pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;");
pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("answer");
if(!iter.hasNext()) Assert.fail("No Output received");
while(iter.hasNext()){
Tuple t = iter.next();
- Assert.assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),(DataType.toDouble(t.get(0))).doubleValue());
+ Assert.assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),
+ (DataType.toDouble(t.get(0))).doubleValue(), 0.0d);
}
}
-
- /*
- @Test
- public void testSort() throws Exception{
- testSortDistinct(false, false);
- }
- */
@Test
- public void testSortWithUDF() throws Exception{
- testSortDistinct(false, true);
- }
+ public void testSort() throws Exception{
+ testSortDistinct(false);
+ }
@Test
public void testDistinct() throws Exception{
- testSortDistinct(true, false);
+ testSortDistinct(true);
}
-
- public static class TupComp extends ComparisonFunc {
- @Override
- public int compare(Tuple t1, Tuple t2) {
- return t1.compareTo(t2);
- }
- }
-
- private void testSortDistinct(boolean eliminateDuplicates, boolean useUDF) throws Exception{
+ private void testSortDistinct(boolean eliminateDuplicates) throws Exception{
int LOOP_SIZE = 1024*16;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -376,18 +364,14 @@ public class TestEvalPipeline {
for(int i = 0; i < LOOP_SIZE; i++) {
ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
}
- ps.close();
-
- pigServer.registerQuery("A = LOAD '"
+ ps.close();
+
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
if (eliminateDuplicates){
pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
}else{
- if(!useUDF) {
- pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
- } else {
- pigServer.registerQuery("B = ORDER A BY $0 using " + TupComp.class.getName() + ";");
- }
+ pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
}
Iterator<Tuple> iter = pigServer.openIterator("B");
String last = "";
@@ -404,9 +388,9 @@ public class TestEvalPipeline {
Assert.assertEquals(t.size(), 2);
last = t.get(0).toString();
}
- }
+ }
}
-
+
@Test
public void testNestedPlan() throws Exception{
int LOOP_COUNT = 10;
@@ -420,7 +404,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = group A by $0;");
String query = "C = foreach B {"
@@ -460,7 +444,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = group A by $0;");
String query = "C = foreach B {"
@@ -502,7 +486,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = limit A 5;");
Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -514,15 +498,15 @@ public class TestEvalPipeline {
}
Assert.assertEquals(5, numIdentity);
}
-
+
@Test
public void testComplexData() throws IOException, ExecException {
// Create input file with ascii data
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});
-
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
Iterator<Tuple> it = pigServer.openIterator("b");
@@ -532,10 +516,10 @@ public class TestEvalPipeline {
Assert.assertEquals("2", t.get(2).toString());
Assert.assertEquals("value1", t.get(3).toString());
Assert.assertEquals("value2", t.get(4).toString());
-
+
//test with BinStorage
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
String output = "/pig/out/TestEvalPipeline-testComplexData";
pigServer.deleteFile(output);
@@ -549,17 +533,17 @@ public class TestEvalPipeline {
Assert.assertEquals("1", t.get(1).toString());
Assert.assertEquals("2", t.get(2).toString());
Assert.assertEquals("value1", t.get(3).toString());
- Assert.assertEquals("value2", t.get(4).toString());
+ Assert.assertEquals("value2", t.get(4).toString());
}
@Test
public void testBinStorageDetermineSchema() throws IOException, ExecException {
// Create input file with ascii data
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});
-
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
Iterator<Tuple> it = pigServer.openIterator("b");
@@ -569,10 +553,10 @@ public class TestEvalPipeline {
Assert.assertEquals(2, t.get(2));
Assert.assertEquals("value1", t.get(3).toString());
Assert.assertEquals("value2", t.get(4).toString());
-
+
//test with BinStorage
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema";
pigServer.deleteFile(output);
@@ -587,7 +571,7 @@ public class TestEvalPipeline {
String[] generates = {"q = foreach p generate COUNT(b), t2.a, t2.b as t2b, m#'key1', m#'key2', b;",
"q = foreach p generate COUNT(b), t2.$0, t2.$1, m#'key1', m#'key2', b;",
"q = foreach p generate COUNT($0), $1.$0, $1.$1, $2#'key1', $2#'key2', $0;"};
-
+
for (int i = 0; i < loads.length; i++) {
pigServer.registerQuery(loads[i]);
pigServer.registerQuery(generates[i]);
@@ -605,18 +589,18 @@ public class TestEvalPipeline {
for (Iterator<Tuple> bit = bg.iterator(); bit.hasNext();) {
Tuple bt = bit.next();
Assert.assertEquals(String.class, bt.get(0).getClass());
- Assert.assertEquals(String.class, bt.get(1).getClass());
+ Assert.assertEquals(String.class, bt.get(1).getClass());
}
- }
+ }
}
@Test
public void testProjectBag() throws IOException, ExecException {
// This tests make sure that when a bag with multiple columns is
// projected all columns apear in the output
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"f1\tf2\tf3"});
- pigServer.registerQuery("a = load '"
+ pigServer.registerQuery("a = load '"
+ Util.generateURI(input.toString(), pigContext) + "' as (x, y, z);");
pigServer.registerQuery("b = group a by x;");
pigServer.registerQuery("c = foreach b generate flatten(a.(y, z));");
@@ -630,11 +614,11 @@ public class TestEvalPipeline {
@Test
public void testBinStorageDetermineSchema2() throws IOException, ExecException {
// Create input file with ascii data
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"pigtester\t10\t1.2"});
-
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (name:chararray, age:int, gpa:double);");
String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema2";
pigServer.deleteFile(output);
@@ -649,7 +633,7 @@ public class TestEvalPipeline {
String[] generates = {"q = foreach p generate name, age, gpa;",
"q = foreach p generate name, age, gpa;",
"q = foreach p generate $0, $1, $2;"};
-
+
for (int i = 0; i < loads.length; i++) {
pigServer.registerQuery(loads[i]);
pigServer.registerQuery(generates[i]);
@@ -662,7 +646,7 @@ public class TestEvalPipeline {
Assert.assertEquals(1.2, t.get(2));
Assert.assertEquals(Double.class, t.get(2).getClass());
}
-
+
// test that valid casting is allowed
pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
" as (name, age:long, gpa:float);");
@@ -675,7 +659,7 @@ public class TestEvalPipeline {
Assert.assertEquals(Long.class, t.get(1).getClass());
Assert.assertEquals(1.2f, t.get(2));
Assert.assertEquals(Float.class, t.get(2).getClass());
-
+
// test that implicit casts work
pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
" as (name, age, gpa);");
@@ -689,15 +673,15 @@ public class TestEvalPipeline {
Assert.assertEquals(1, t.get(2));
Assert.assertEquals(Integer.class, t.get(2).getClass());
}
-
+
@Test
public void testCogroupWithInputFromGroup() throws IOException, ExecException {
// Create input file with ascii data
- File input = Util.createInputFile("tmp", "",
- new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2",
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2",
"pigtester2\t10\t1.2",
"pigtester3\t10\t1.2", "pigtester3\t20\t1.2", "pigtester3\t30\t1.2"});
-
+
Map<String, Pair<Long, Long>> resultMap = new HashMap<String, Pair<Long, Long>>();
// we will in essence be doing a group on first column and getting
// SUM over second column and a count for the group - store
@@ -705,13 +689,13 @@ public class TestEvalPipeline {
resultMap.put("pigtester", new Pair<Long, Long>(25L, 2L));
resultMap.put("pigtester2", new Pair<Long, Long>(10L, 1L));
resultMap.put("pigtester3", new Pair<Long, Long>(60L, 3L));
-
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
- pigServer.registerQuery("c = load '"
- + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ pigServer.registerQuery("c = load '"
+ + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
+ "as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("d = cogroup b by group, c by name;");
pigServer.registerQuery("e = foreach d generate flatten(group), SUM(c.age), COUNT(c.name);");
@@ -719,27 +703,28 @@ public class TestEvalPipeline {
for(int i = 0; i < resultMap.size(); i++) {
Tuple t = it.next();
Assert.assertEquals(true, resultMap.containsKey(t.get(0)));
- Pair<Long, Long> output = resultMap.get(t.get(0));
+ Pair<Long, Long> output = resultMap.get(t.get(0));
Assert.assertEquals(output.first, t.get(1));
Assert.assertEquals(output.second, t.get(2));
}
}
-
+
@Test
public void testUtf8Dump() throws IOException, ExecException {
-
+
// Create input file with unicode data
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"wendyξ"});
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext)
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext)
+ "' using PigStorage() " + "as (name:chararray);");
Iterator<Tuple> it = pigServer.openIterator("a");
Tuple t = it.next();
Assert.assertEquals("wendyξ", t.get(0));
-
+
}
+ @SuppressWarnings("unchecked")
@Test
public void testMapUDF() throws Exception{
int LOOP_COUNT = 2;
@@ -753,7 +738,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
String query = "C = foreach B {"
@@ -793,7 +778,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
String query = "C = foreach B {"
@@ -811,19 +796,19 @@ public class TestEvalPipeline {
@Test
public void testLoadCtorArgs() throws IOException, ExecException {
-
+
// Create input file
- File input = Util.createInputFile("tmp", "",
+ File input = Util.createInputFile("tmp", "",
new String[] {"hello:world"});
- pigServer.registerQuery("a = load '"
- + Util.generateURI(input.toString(), pigContext)
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(input.toString(), pigContext)
+ "' using org.apache.pig.test.PigStorageNoDefCtor(':');");
pigServer.registerQuery("b = foreach a generate (chararray)$0, (chararray)$1;");
Iterator<Tuple> it = pigServer.openIterator("b");
Tuple t = it.next();
Assert.assertEquals("hello", t.get(0));
Assert.assertEquals("world", t.get(1));
-
+
}
@Test
@@ -839,7 +824,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = group A by $0;");
String query = "C = foreach B {"
@@ -881,7 +866,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = distinct A;");
String query = "C = foreach B {"
@@ -928,7 +913,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = distinct A;");
String query = "C = foreach B {"
@@ -969,7 +954,7 @@ public class TestEvalPipeline {
}
ps.close();
- pigServer.registerQuery("A = LOAD '"
+ pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = distinct A ;"); //the argument does not matter
pigServer.registerQuery("C = foreach B generate FLATTEN(" + Identity.class.getName() + "($0, $1));"); //the argument does not matter
@@ -990,7 +975,7 @@ public class TestEvalPipeline {
Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
}
-
+
@Test
public void testCogroupAfterDistinct() throws Exception {
String[] input1 = {
@@ -1011,13 +996,13 @@ public class TestEvalPipeline {
};
Util.createInputFile(cluster, "table1", input1);
Util.createInputFile(cluster, "table2", input2);
-
+
pigServer.registerQuery("nonuniqtable1 = LOAD 'table1' AS (f1:chararray);");
pigServer.registerQuery("table1 = DISTINCT nonuniqtable1;");
pigServer.registerQuery("table2 = LOAD 'table2' AS (f1:chararray, f2:int);");
pigServer.registerQuery("temp = COGROUP table1 BY f1 INNER, table2 BY f1;");
Iterator<Tuple> it = pigServer.openIterator("temp");
-
+
// results should be:
// (abc,{(abc)},{})
// (def,{(def)},{})
@@ -1025,7 +1010,7 @@ public class TestEvalPipeline {
HashMap<String, Tuple> results = new HashMap<String, Tuple>();
Object[] row = new Object[] { "abc",
Util.createBagOfOneColumn(new String[] { "abc"}), mBf.newDefaultBag() };
- results.put("abc", Util.createTuple(row));
+ results.put("abc", Util.createTuple(row));
row = new Object[] { "def",
Util.createBagOfOneColumn(new String[] { "def"}), mBf.newDefaultBag() };
results.put("def", Util.createTuple(row));
@@ -1044,16 +1029,16 @@ public class TestEvalPipeline {
Assert.assertEquals(expected.get(i++), field);
}
}
-
+
Util.deleteFile(cluster, "table1");
Util.deleteFile(cluster, "table2");
}
@Test
public void testAlgebraicDistinctProgress() throws Exception {
-
+
//creating a test input of larger than 1000 to make
- //sure that progress kicks in. The only way to test this
+ //sure that progress kicks in. The only way to test this
//is to add a log statement to the getDistinct
//method in Distinct.java. There is no automated mechanism
//to check this from pig
@@ -1066,66 +1051,66 @@ public class TestEvalPipeline {
inpString[i] = new Integer(i/2).toString();
inpString[i+1] = new Integer(i/2).toString();
}
-
+
Util.createInputFile(cluster, "table", inpString);
pigServer.registerQuery("a = LOAD 'table' AS (i:int);");
pigServer.registerQuery("b = group a ALL;");
pigServer.registerQuery("c = foreach b {aa = DISTINCT a; generate COUNT(aa);};");
Iterator<Tuple> it = pigServer.openIterator("c");
-
+
Integer[] exp = new Integer[inputSize/2];
for(int j = 0; j < inputSize/2; ++j) {
exp[j] = j;
}
DataBag expectedBag = Util.createBagOfOneColumn(exp);
-
+
while(it.hasNext()) {
Tuple tup = it.next();
Long resultBagSize = (Long)tup.get(0);
Assert.assertTrue(DataType.compare(expectedBag.size(), resultBagSize) == 0);
}
-
- Util.deleteFile(cluster, "table");
+
+ Util.deleteFile(cluster, "table");
}
@Test
public void testBinStorageWithLargeStrings() throws Exception {
// Create input file with large strings
- int testSize = 100;
- String[] stringArray = new String[testSize];
- Random random = new Random();
- stringArray[0] = GenRandomData.genRandLargeString(random, 65534);
- for(int i = 1; i < stringArray.length; ++i) {
- //generate a few large strings every 25th record
- if((i % 25) == 0) {
- stringArray[i] = GenRandomData.genRandLargeString(random, 65535 + i);
- } else {
- stringArray[i] = GenRandomData.genRandString(random);
- }
- }
-
- Util.createInputFile(cluster, "table", stringArray);
-
- //test with BinStorage
+ int testSize = 100;
+ String[] stringArray = new String[testSize];
+ Random random = new Random();
+ stringArray[0] = GenRandomData.genRandLargeString(random, 65534);
+ for(int i = 1; i < stringArray.length; ++i) {
+ //generate a few large strings every 25th record
+ if((i % 25) == 0) {
+ stringArray[i] = GenRandomData.genRandLargeString(random, 65535 + i);
+ } else {
+ stringArray[i] = GenRandomData.genRandString(random);
+ }
+ }
+
+ Util.createInputFile(cluster, "table", stringArray);
+
+ //test with BinStorage
pigServer.registerQuery("a = load 'table' using PigStorage() " +
"as (c: chararray);");
String output = "/pig/out/TestEvalPipeline-testBinStorageLargeStrings";
pigServer.deleteFile(output);
pigServer.store("a", output, BinStorage.class.getName());
-
+
pigServer.registerQuery("b = load '" + output +"' using BinStorage() " +
- "as (c:chararray);");
+ "as (c:chararray);");
pigServer.registerQuery("c = foreach b generate c;");
-
+
Iterator<Tuple> it = pigServer.openIterator("c");
int counter = 0;
while(it.hasNext()) {
Tuple tup = it.next();
String resultString = (String)tup.get(0);
String expectedString = stringArray[counter];
- Assert.assertTrue(expectedString.equals(resultString));
+ Assert.assertTrue(expectedString.equals(resultString));
++counter;
}
Util.deleteFile(cluster, "table");
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Fri May 30 19:07:23 2014
@@ -494,192 +494,6 @@ public class TestEvalPipeline2 {
Util.deleteFile(cluster, "table_testNestedDescSort");
}
- // See PIG-282
- @Test
- public void testCustomPartitionerParseJoins() throws Exception{
- String[] input = {
- "1\t3",
- "1\t2"
- };
- Util.createInputFile(cluster, "table_testCustomPartitionerParseJoins", input);
-
- // Custom Partitioner is not allowed for skewed joins, will throw a ExecException
- try {
- pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerParseJoins' as (a0:int, a1:int);");
- pigServer.registerQuery("B = ORDER A by $0;");
- pigServer.registerQuery("skewed = JOIN A by $0, B by $0 USING 'skewed' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
- //control should not reach here
- Assert.fail("Skewed join cannot accept a custom partitioner");
- } catch(FrontendException e) {
- Assert.assertTrue( e.getMessage().contains( "Custom Partitioner is not supported for skewed join" ) );
- }
-
- pigServer.registerQuery("hash = JOIN A by $0, B by $0 USING 'hash' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
- Iterator<Tuple> iter = pigServer.openIterator("hash");
- Tuple t;
-
- Collection<String> results = new HashSet<String>();
- results.add("(1,3,1,2)");
- results.add("(1,3,1,3)");
- results.add("(1,2,1,2)");
- results.add("(1,2,1,3)");
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- // No checks are made for merged and replicated joins as they are compiled to a map only job
- // No frontend error checking has been added for these jobs, hence not adding any test cases
- // Manually tested the sanity once. Above test should cover the basic sanity of the scenario
-
- Util.deleteFile(cluster, "table_testCustomPartitionerParseJoins");
- }
-
- // See PIG-282
- @Test
- public void testCustomPartitionerGroups() throws Exception{
- String[] input = {
- "1\t1",
- "2\t1",
- "3\t1",
- "4\t1"
- };
- Util.createInputFile(cluster, "table_testCustomPartitionerGroups", input);
-
- pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerGroups' as (a0:int, a1:int);");
-
- // It should be noted that for a map reduce job, the total number of partitions
- // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein
- // we will get more than one reduce job so that we can use the partitioner.
- // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
- // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
- // partition number is bigger than 1.
- //
- pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
-
- pigServer.store("B", "tmp_testCustomPartitionerGroups");
-
- new File("tmp_testCustomPartitionerGroups").mkdir();
-
- Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
- BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
- String line = null;
- while((line = reader.readLine()) != null) {
- Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
- }
- Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
- reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
- line = null;
- int count=0;
- while((line = reader.readLine()) != null) {
- //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
- count++;
- }
- Assert.assertEquals(4, count);
- Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
- Util.deleteFile(cluster, "tmp_testCustomPartitionerGroups");
- Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
- }
-
- // See PIG-3385
- @Test
- public void testCustomPartitionerDistinct() throws Exception{
- String[] input = {
- "1\t1",
- "2\t1",
- "1\t1",
- "3\t1",
- "4\t1",
- };
- Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
-
- pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
- pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
- pigServer.store("B", "tmp_testCustomPartitionerDistinct");
-
- new File("tmp_testCustomPartitionerDistinct").mkdir();
-
- // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
- Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00000", "tmp_testCustomPartitionerDistinct/part-r-00000");
- BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00000"));
- String line = null;
- while((line = reader.readLine()) != null) {
- Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
- }
- Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00001", "tmp_testCustomPartitionerDistinct/part-r-00001");
- reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00001"));
- line = null;
- int count=0;
- while((line = reader.readLine()) != null) {
- //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
- count++;
- }
- Assert.assertEquals(4, count);
- Util.deleteDirectory(new File("tmp_testCustomPartitionerDistinct"));
- Util.deleteFile(cluster, "tmp_testCustomPartitionerDistinct");
- Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
- }
-
- // See PIG-282
- @Test
- public void testCustomPartitionerCross() throws Exception{
- String[] input = {
- "1\t3",
- "1\t2",
- };
-
- Util.createInputFile(cluster, "table_testCustomPartitionerCross", input);
- pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerCross' as (a0:int, a1:int);");
- pigServer.registerQuery("B = ORDER A by $0;");
- pigServer.registerQuery("C = cross A , B PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
- Iterator<Tuple> iter = pigServer.openIterator("C");
- Tuple t;
-
- Collection<String> results = new HashSet<String>();
- results.add("(1,3,1,2)");
- results.add("(1,3,1,3)");
- results.add("(1,2,1,2)");
- results.add("(1,2,1,3)");
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Util.deleteFile(cluster, "table_testCustomPartitionerCross");
- }
-
// See PIG-972
@Test
public void testDescribeNestedAlias() throws Exception{
Modified: pig/trunk/test/org/apache/pig/test/TestFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoin.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoin.java Fri May 30 19:07:23 2014
@@ -55,10 +55,10 @@ public class TestFRJoin {
private static final String INPUT_FILE = "testFrJoinInput.txt";
private static final String INPUT_FILE2 = "testFrJoinInput2.txt";
private PigServer pigServer;
- private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
public TestFRJoin() throws ExecException, IOException {
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
}
@Before
@@ -126,11 +126,10 @@ public class TestFRJoin {
pc.connect();
ld.setPc(pc);
- Tuple dummyTuple = null;
for (Result res = ld.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = ld
.getNextTuple()) {
Tuple tup = (Tuple)res.result;
- LoadFunc lf = ((LoadFunc)pc.instantiateFuncFromSpec(ld.getLFile().getFuncSpec()));
+ LoadFunc lf = ((LoadFunc)PigContext.instantiateFuncFromSpec(ld.getLFile().getFuncSpec()));
String key = lf.getLoadCaster().bytesToCharArray(
((DataByteArray)tup.get(keyField)).get());
Tuple csttup = TupleFactory.getInstance().newTuple(2);
@@ -530,6 +529,7 @@ public class TestFRJoin {
Schema frjSch = null, shjSch = null;
pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';");
frjSch = pigServer.dumpSchema("C");
+ assertNull(frjSch);
pigServer.registerQuery("C = join A by $0, B by $0;");
shjSch = pigServer.dumpSchema("C");
assertNull(shjSch);
@@ -556,6 +556,7 @@ public class TestFRJoin {
Schema frjSch = null, shjSch = null;
pigServer.registerQuery("D = join A by $0, B by $0, C by $0 using 'repl';");
frjSch = pigServer.dumpSchema("D");
+ assertNull(frjSch);
pigServer.registerQuery("D = join A by $0, B by $0, C by $0;");
shjSch = pigServer.dumpSchema("D");
assertNull(shjSch);
@@ -580,6 +581,7 @@ public class TestFRJoin {
Schema frjSch = null, shjSch = null;
pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1) using 'repl';");
frjSch = pigServer.dumpSchema("C");
+ assertNull(frjSch);
pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1);");
shjSch = pigServer.dumpSchema("C");
assertNull(shjSch);
Modified: pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoin2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Fri May 30 19:07:23 2014
@@ -43,26 +43,30 @@ import org.junit.Test;
public class TestFRJoin2 {
- private static MiniCluster cluster = MiniCluster.buildCluster();
-
+ // This class contains tests for
+ // - Concatenating small files before adding to DistributedCache (PIG-1458)
+ // - imposing size limit on files being added to DistributedCache
+ // Since Replicated join in Tez does not use DistributedCache, these tests are MR specific
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_MR);
+
private static final String INPUT_DIR = "frjoin";
private static final String INPUT_FILE = "input";
-
+
private static final int FILE_MERGE_THRESHOLD = 5;
private static final int MIN_FILE_MERGE_THRESHOLD = 1;
-
+
//contents of input dir joined by comma
private static String concatINPUT_DIR = null;
private File logFile;
-
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
StringBuilder strBuilder = new StringBuilder();
FileSystem fs = cluster.getFileSystem();
fs.mkdirs(new Path(INPUT_DIR));
int LOOP_SIZE = 2;
- for (int i=0; i<FILE_MERGE_THRESHOLD; i++) {
+ for (int i=0; i<FILE_MERGE_THRESHOLD; i++) {
String[] input = new String[2*LOOP_SIZE];
for (int n=0; n<LOOP_SIZE; n++) {
for (int j=0; j<LOOP_SIZE;j++) {
@@ -76,7 +80,7 @@ public class TestFRJoin2 {
}
strBuilder.deleteCharAt(strBuilder.length() - 1);
concatINPUT_DIR = strBuilder.toString();
-
+
String[] input2 = new String[2*(LOOP_SIZE/2)];
int k = 0;
for (int i=1; i<=LOOP_SIZE/2; i++) {
@@ -93,179 +97,175 @@ public class TestFRJoin2 {
cluster.shutDown();
}
- // test simple scalar alias with file concatenation following
+ // test simple scalar alias with file concatenation following
// a MapReduce job
@Test
public void testConcatenateJobForScalar() throws Exception {
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
-
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
-
- // using $0*0, instead of group-all because group-all sets parallelism to 1
- pigServer.registerQuery("B = group A by $0*0 parallel 5;");
+
+ // using $0*0, instead of group-all because group-all sets parallelism to 1
+ pigServer.registerQuery("B = group A by $0*0 parallel 5;");
pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.y) as max;");
-
+
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.getPigContext().getProperties().setProperty(
MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD));
-
+
pigServer.registerQuery("D= foreach A generate x / C.count, C.max - y;");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
-
+
JobGraph jGraph = PigStats.get().getJobGraph();
assertEquals(3, jGraph.size());
- // find added map-only concatenate job
+ // find added map-only concatenate job
MRJobStats js = (MRJobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0);
- assertEquals(1, js.getNumberMaps());
- assertEquals(0, js.getNumberReduces());
+ assertEquals(1, js.getNumberMaps());
+ assertEquals(0, js.getNumberReduces());
}
{
pigServer.getPigContext().getProperties().setProperty(
"pig.noSplitCombination", "true");
-
+
pigServer.registerQuery("D= foreach A generate x / C.count, C.max - y;");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
while(iter.hasNext()) {
dbshj.add(iter.next());
}
-
+
assertEquals(2, PigStats.get().getJobGraph().size());
}
-
+
assertEquals(dbfrj.size(), dbshj.size());
- assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
-
- // test simple scalar alias with file concatenation following
+
+ // test simple scalar alias with file concatenation following
// a Map-only job
@Test
public void testConcatenateJobForScalar2() throws Exception {
logFile = Util.resetLog(MRCompiler.class, logFile);
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
-
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "/{part-00*}" +"' as (x:int,y:int);");
pigServer.registerQuery("C = filter B by (x == 3) AND (y == 2);");
-
+
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.getPigContext().getProperties().setProperty(
MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
-
+
pigServer.registerQuery("D = foreach A generate x / C.x, y + C.y;");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
-
+
JobGraph jGraph = PigStats.get().getJobGraph();
assertEquals(3, jGraph.size());
- // find added map-only concatenate job
+ // find added map-only concatenate job
MRJobStats js = (MRJobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0);
- assertEquals(1, js.getNumberMaps());
- assertEquals(0, js.getNumberReduces());
- Util.checkLogFileMessage(logFile,
- new String[] {"number of input files: 0", "failed to get number of input files"},
+ assertEquals(1, js.getNumberMaps());
+ assertEquals(0, js.getNumberReduces());
+ Util.checkLogFileMessage(logFile,
+ new String[] {"number of input files: 0", "failed to get number of input files"},
false
);
}
{
pigServer.getPigContext().getProperties().setProperty(
"pig.noSplitCombination", "true");
-
+
pigServer.registerQuery("D = foreach A generate x / C.x, y + C.y;");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
while(iter.hasNext()) {
dbshj.add(iter.next());
}
-
+
assertEquals(2, PigStats.get().getJobGraph().size());
}
-
+
assertEquals(dbfrj.size(), dbshj.size());
- assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
-
- // test scalar alias with file concatenation following
+
+ // test scalar alias with file concatenation following
// a multi-query job
@Test
public void testConcatenateJobForScalar3() throws Exception {
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
-
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
pigServer.registerQuery("C = group A all parallel 5;");
pigServer.registerQuery("D = foreach C generate COUNT(A) as count;");
pigServer.registerQuery("E = foreach C generate MAX(A.x) as max;");
-
+
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.getPigContext().getProperties().setProperty(
MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
-
+
pigServer.registerQuery("F = foreach B generate x / D.count, y + E.max;");
Iterator<Tuple> iter = pigServer.openIterator("F");
-
+
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
-
+
JobGraph jGraph = PigStats.get().getJobGraph();
assertEquals(4, jGraph.size());
}
{
pigServer.getPigContext().getProperties().setProperty(
"pig.noSplitCombination", "true");
-
+
pigServer.registerQuery("F = foreach B generate x / D.count, y + E.max;");
Iterator<Tuple> iter = pigServer.openIterator("F");
-
+
while(iter.hasNext()) {
dbshj.add(iter.next());
}
-
+
assertEquals(2, PigStats.get().getJobGraph().size());
}
-
+
assertEquals(dbfrj.size(), dbshj.size());
- assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
-
+
@Test
public void testConcatenateJobForFRJoin() throws Exception {
logFile = Util.resetLog(MRCompiler.class, logFile);
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
-
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "/{part-00*}" + "' as (x:int,y:int);");
-
+
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.getPigContext().getProperties().setProperty(
- MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
-
+ MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
+
pigServer.registerQuery("C = join A by y, B by y using 'repl';");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
-
+
assertEquals(3, PigStats.get().getJobGraph().size());
- Util.checkLogFileMessage(logFile,
- new String[] {"number of input files: 0", "failed to get number of input files"},
+ Util.checkLogFileMessage(logFile,
+ new String[] {"number of input files: 0", "failed to get number of input files"},
false
);
@@ -273,28 +273,27 @@ public class TestFRJoin2 {
{
pigServer.getPigContext().getProperties().setProperty(
"pig.noSplitCombination", "true");
-
+
pigServer.registerQuery("C = join A by y, B by y using 'repl';");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbshj.add(iter.next());
}
-
+
assertEquals(2, PigStats.get().getJobGraph().size());
}
-
+
assertEquals(dbfrj.size(), dbshj.size());
- assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
-
+
@Test
public void testTooManyReducers() throws Exception {
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
- .getProperties());
-
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
- pigServer.registerQuery("B = group A by x parallel " + FILE_MERGE_THRESHOLD + ";");
+ pigServer.registerQuery("B = group A by x parallel " + FILE_MERGE_THRESHOLD + ";");
pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
@@ -302,52 +301,52 @@ public class TestFRJoin2 {
MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(FILE_MERGE_THRESHOLD));
pigServer.registerQuery("D = join C by $0, B by $0 using 'repl';");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
while(iter.hasNext()) {
Tuple t = iter.next();
- dbfrj.add(t);
+ dbfrj.add(t);
}
-
+
JobGraph jGraph = PigStats.get().getJobGraph();
assertEquals(3, jGraph.size());
- // find added map-only concatenate job
+ // find added map-only concatenate job
MRJobStats js = (MRJobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0);
- assertEquals(1, js.getNumberMaps());
- assertEquals(0, js.getNumberReduces());
+ assertEquals(1, js.getNumberMaps());
+ assertEquals(0, js.getNumberReduces());
}
{
pigServer.getPigContext().getProperties().setProperty(
"pig.noSplitCombination", "true");
pigServer.registerQuery("D = join C by $0, B by $0 using 'repl';");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
while(iter.hasNext()) {
Tuple t = iter.next();
- dbshj.add(t);
+ dbshj.add(t);
}
assertEquals(2, PigStats.get().getJobGraph().size());
- }
+ }
assertEquals(dbfrj.size(), dbshj.size());
- assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
-
+
@Test
public void testUnknownNumMaps() throws Exception {
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-
+
pigServer.registerQuery("A = LOAD '" + concatINPUT_DIR + "' as (x:int,y:int);");
pigServer.registerQuery("B = Filter A by x < 50;");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.getPigContext().getProperties().setProperty(
- MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
+ MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
-
+
JobGraph jGraph = PigStats.get().getJobGraph();
assertEquals(3, jGraph.size());
}
@@ -356,30 +355,30 @@ public class TestFRJoin2 {
"pig.noSplitCombination", "true");
pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbshj.add(iter.next());
}
assertEquals(2, PigStats.get().getJobGraph().size());
}
assertEquals(dbfrj.size(), dbshj.size());
- assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
-
+
@Test
public void testUnknownNumMaps2() throws Exception {
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-
+
pigServer.registerQuery("A = LOAD '" + INPUT_DIR + "' as (x:int,y:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
pigServer.registerQuery("C = join A by x, B by x using 'repl';");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.getPigContext().getProperties().setProperty(
- MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
+ MRCompiler.FILE_CONCATENATION_THRESHOLD, String.valueOf(MIN_FILE_MERGE_THRESHOLD));
pigServer.registerQuery("D = join B by $0, C by $0 using 'repl';");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
@@ -392,7 +391,7 @@ public class TestFRJoin2 {
"pig.noSplitCombination", "true");
pigServer.registerQuery("D = join B by $0, C by $0 using 'repl';");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
while(iter.hasNext()) {
dbshj.add(iter.next());
}
@@ -404,8 +403,7 @@ public class TestFRJoin2 {
@Test
public void testTooBigReplicatedFile() throws Exception {
- PigServer pigServer = new PigServer(
- ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("A = LOAD '" + INPUT_DIR + "' as (x:int,y:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
Modified: pig/trunk/test/org/apache/pig/test/TestFRJoinNullValue.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoinNullValue.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoinNullValue.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoinNullValue.java Fri May 30 19:07:23 2014
@@ -20,22 +20,20 @@ package org.apache.pig.test;
import java.util.Iterator;
-import junit.framework.Assert;
-
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.utils.TestHelper;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestFRJoinNullValue {
- private static MiniCluster cluster = MiniCluster.buildCluster();
-
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
String[] input = new String[4];
@@ -52,14 +50,14 @@ public class TestFRJoinNullValue {
@Test
public void testNullMatch() throws Exception {
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("A = LOAD 'input';");
pigServer.registerQuery("B = LOAD 'input';");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("C = join A by $0, B by $0 using 'replicated';");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
@@ -67,25 +65,25 @@ public class TestFRJoinNullValue {
{
pigServer.registerQuery("C = join A by $0, B by $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbshj.add(iter.next());
}
}
Assert.assertEquals(dbfrj.size(), dbshj.size());
- Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
-
+
@Test
public void testTupleNullMatch() throws Exception {
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("A = LOAD 'input' as (x:int,y:int,z:int);");
pigServer.registerQuery("B = LOAD 'input' as (x:int,y:int,z:int);");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("C = join A by (x, y), B by (x, y) using 'replicated';");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
@@ -93,25 +91,25 @@ public class TestFRJoinNullValue {
{
pigServer.registerQuery("C = join A by (x, y), B by (x, y);");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbshj.add(iter.next());
}
}
Assert.assertEquals(dbfrj.size(), dbshj.size());
- Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
-
+
@Test
public void testLeftNullMatch() throws Exception {
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("A = LOAD 'input' as (x:int,y:int, z:int);");
pigServer.registerQuery("B = LOAD 'input' as (x:int,y:int, z:int);");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("C = join A by $0 left, B by $0 using 'replicated';");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
@@ -119,25 +117,25 @@ public class TestFRJoinNullValue {
{
pigServer.registerQuery("C = join A by $0 left, B by $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbshj.add(iter.next());
}
}
Assert.assertEquals(dbfrj.size(), dbshj.size());
- Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
-
+
@Test
public void testTupleLeftNullMatch() throws Exception {
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("A = LOAD 'input' as (x:int,y:int,z:int);");
pigServer.registerQuery("B = LOAD 'input' as (x:int,y:int,z:int);");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
{
pigServer.registerQuery("C = join A by (x, y) left, B by (x, y) using 'replicated';");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbfrj.add(iter.next());
}
@@ -145,12 +143,12 @@ public class TestFRJoinNullValue {
{
pigServer.registerQuery("C = join A by (x, y) left, B by (x, y);");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
while(iter.hasNext()) {
dbshj.add(iter.next());
}
}
Assert.assertEquals(dbfrj.size(), dbshj.size());
- Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestFilterUDF.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFilterUDF.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFilterUDF.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFilterUDF.java Fri May 30 19:07:23 2014
@@ -17,7 +17,6 @@ package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
@@ -27,7 +26,6 @@ import java.io.PrintWriter;
import java.util.Iterator;
import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
@@ -40,7 +38,7 @@ import org.junit.Test;
public class TestFilterUDF {
private PigServer pigServer;
- private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
private File tmpFile;
TupleFactory tf = TupleFactory.getInstance();
@@ -57,7 +55,7 @@ public class TestFilterUDF {
@Before
public void setUp() throws Exception {
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
int LOOP_SIZE = 20;
tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
Modified: pig/trunk/test/org/apache/pig/test/TestFinish.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFinish.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFinish.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFinish.java Fri May 30 19:07:23 2014
@@ -34,7 +34,6 @@ import org.apache.pig.builtin.PigStorage
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.FileLocalizer;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
@@ -46,7 +45,7 @@ public class TestFinish {
BagFactory mBf = BagFactory.getInstance();
File f1;
- static MiniCluster cluster = MiniCluster.buildCluster();
+ static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
static public class MyEvalFunction extends EvalFunc<Tuple> {
String execType;
@@ -78,10 +77,8 @@ public class TestFinish {
@Before
public void setUp() throws Exception {
- // re initialize FileLocalizer so that each test runs correctly without
- // any side effect of other tests - this is needed here since some
- // tests are in mapred and some in local mode
- FileLocalizer.setInitialized(false);
+ // Reset state since some tests are in mapred and some in local mode
+ Util.resetStateForExecModeSwitch();
}
@AfterClass
@@ -102,7 +99,7 @@ public class TestFinish {
}
ps.close();
} else {
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
f1 = File.createTempFile("test", "txt");
f1.delete();
inputFileName = Util.removeColon(f1.getAbsolutePath());
@@ -118,7 +115,7 @@ public class TestFinish {
private void checkAndCleanup(ExecType execType, String expectedFileName,
String inputFileName) throws IOException {
- if (execType == ExecType.MAPREDUCE) {
+ if (execType == cluster.getExecType()) {
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
cluster.getProperties()));
assertTrue(fs.exists(new Path(expectedFileName)));
@@ -136,7 +133,7 @@ public class TestFinish {
@Test
public void testFinishInMapMR() throws Exception {
- String inputFileName = setUp(ExecType.MAPREDUCE);
+ String inputFileName = setUp(cluster.getExecType());
// this file will be created on the cluster if finish() is called
String expectedFileName = "testFinishInMapMR-finish.txt";
pigServer.registerQuery("define MYUDF " + MyEvalFunction.class.getName() + "('MAPREDUCE','"
@@ -149,13 +146,13 @@ public class TestFinish {
iter.next();
}
- checkAndCleanup(ExecType.MAPREDUCE, expectedFileName, inputFileName);
+ checkAndCleanup(cluster.getExecType(), expectedFileName, inputFileName);
}
@Test
public void testFinishInReduceMR() throws Exception {
- String inputFileName = setUp(ExecType.MAPREDUCE);
+ String inputFileName = setUp(cluster.getExecType());
// this file will be created on the cluster if finish() is called
String expectedFileName = "testFinishInReduceMR-finish.txt";
pigServer.registerQuery("define MYUDF " + MyEvalFunction.class.getName() + "('MAPREDUCE','"
@@ -169,7 +166,7 @@ public class TestFinish {
iter.next();
}
- checkAndCleanup(ExecType.MAPREDUCE, expectedFileName, inputFileName);
+ checkAndCleanup(cluster.getExecType(), expectedFileName, inputFileName);
}
@Test