You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/22 22:17:22 UTC
svn commit: r614325 [6/6] - in /incubator/pig/branches/types: ./ lib/
scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/
src/org/apache/pig/data/ src/org/apache/pig/impl/
src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/
src/org/apac...
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java Tue Jan 22 13:17:12 2008
@@ -19,7 +19,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
@@ -73,7 +72,6 @@
assertFalse(da1.compareTo(da2) > 0);
}
- /* Replaced by TestTuple.java
@Test
public void testTuple() throws Exception {
int arity = 5;
@@ -160,9 +158,8 @@
n1.appendTuple(n2);
assertTrue(n1.arity() == n1Arity + n2Arity);
}
- */
- /* Replaced by TestDataBag.java
+ /*
@Test
public void testDataBag() throws Exception {
int[] input1 = { 1, 2, 3, 4, 5 };
@@ -197,8 +194,19 @@
caught = true;
}
assertTrue(caught);
+
+ // check that notifications are sent
+ b.clear();
+ DataBag.notifyInterval = 2;
+ Tuple g = Util.loadFlatTuple(new Tuple(input1.length), input1);
+ for (int i = 0; i < 10; i++) {
+ b.add(g);
+ }
+
+ Iterator it = b.content();
+ while (it.hasNext()) it.next();
+ assert(b.numNotifies == 5);
}
- */
@Test
@@ -208,198 +216,81 @@
public void testBigDataBagOnDisk() throws Exception{
Runtime.getRuntime().gc();
- //testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 10000);
+ testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 1000000);
}
+ */
+ private enum TestType {
+ PRE_SORT,
+ POST_SORT,
+ PRE_DISTINCT,
+ POST_DISTINCT,
+ NONE
+ }
+
+ /*
private void testBigDataBag(long freeMemoryToMaintain, int numItems) throws Exception {
BigDataBag.FREE_MEMORY_TO_MAINTAIN = freeMemoryToMaintain;
- File tmp = File.createTempFile("test", "bag").getParentFile();
- BigDataBag bag = new BigDataBag(Datum.DataType.TUPLE, tmp);
- Iterator<Datum> it;
- int count;
- //String last;
- Tuple lastT = null;
-
Random r = new Random();
-
-
- //Basic test
- assertTrue(bag.isEmpty());
-
- for(int i = 0; i < numItems; i++) {
- Tuple t = new Tuple(2);
- t.setField(0, Integer.toHexString(i));
- t.setField(1, i);
- bag.add(t);
- }
-
- assertFalse(bag.isEmpty());
-
- assertTrue(bag.cardinality() == numItems);
-
- int lastI = -1;
- it = bag.content();
- count = 0;
- while(it.hasNext()) {
- Tuple t = (Tuple)it.next();
- int ix = Integer.parseInt(t.getAtomField(0).strval(), 16);
- assertTrue(Integer.toString(ix).equals(t.getAtomField(1).strval()));
- assertEquals(lastI+1, ix);
- lastI = ix;
- count++;
- }
-
- assertTrue(bag.cardinality() == count);
-
- bag.clear();
-
- //Test pre sort
-
- bag.sort();
-
-
- for(int i = 0; i < numItems; i++) {
- Tuple t = new Tuple(1);
- t.setField(0, r.nextInt(100000));
- bag.add(t);
- }
-
- it = bag.content();
- count = 0;
- // last= "";
- lastT = new Tuple();
- while(it.hasNext()) {
- Tuple t = (Tuple)it.next();
- /*
- String next = t.getAtomField(0).strval();
- assertTrue(last.compareTo(next)<=0);
- last = next;
- */
- assertTrue("last should be <= next", lastT.compareTo(t) <= 0);
- lastT = t;
- count++;
- }
-
- assertTrue(bag.cardinality() == count);
-
- bag.clear();
-
-
- //Test post sort
-
- for(int i = 0; i < numItems; i++) {
- Tuple t = new Tuple(1);
- t.setField(0, r.nextInt(100000));
- bag.add(t);
- }
-
- bag.sort();
+
+ for (TestType testType: TestType.values()){
+ BigDataBag bag = BagFactory.getInstance().getNewBigBag();
+
+ assertTrue(bag.isEmpty());
+
+ if (testType == TestType.PRE_SORT)
+ bag.sort();
+ else if (testType == TestType.PRE_DISTINCT)
+ bag.distinct();
+
+ //generate data and add it to the bag
+ for(int i = 0; i < numItems; i++) {
+ Tuple t = new Tuple(1);
+ t.setField(0, r.nextInt(numItems));
+ bag.add(t);
+ }
+
+ assertFalse(bag.isEmpty());
+
+ if (testType == TestType.POST_SORT)
+ bag.sort();
+ else if (testType == TestType.POST_DISTINCT)
+ bag.distinct();
+
+
+ if (testType == TestType.NONE)
+ assertTrue(bag.cardinality() == numItems);
+ checkContents(bag, numItems, testType);
+ checkContents(bag, numItems, testType);
- it = bag.content();
- count = 0;
- //last= "";
- lastT = new Tuple();
- while(it.hasNext()) {
- Tuple t = (Tuple)it.next();
- /*
- String next = t.getAtomField(0).strval();
- assertTrue(last.compareTo(next)<=0);
- last = next;
- */
- assertTrue("last should be <= next", lastT.compareTo(t) <= 0);
- lastT = t;
- count++;
- }
-
- assertTrue(bag.cardinality() == count);
-
- bag.clear();
-
- //test post-distinct
-
-
- for(int i = 0; i < numItems; i++) {
- Tuple t = new Tuple(1);
- //To get a lot of duplicates
- t.setField(0, r.nextInt(1000));
- bag.add(t);
- }
+ }
+ }
+
+
+ private void checkContents(DataBag bag, int numItems, TestType testType) throws Exception{
+ String last = "";
+ DataBag.notifyInterval = 100;
- bag.distinct();
-
- it = bag.content();
- count = 0;
- //last= "";
- lastT = new Tuple();
+ Iterator<Tuple> it = bag.content();
+ int count = 0;
while(it.hasNext()) {
- Tuple t = (Tuple)it.next();
- /*
- String next = t.getAtomField(0).strval();
- assertTrue(last.compareTo(next)<0);
+ Tuple t = it.next();
+ String next = t.getAtomField(0).strval();
+ if (testType == TestType.POST_SORT || testType == TestType.PRE_SORT)
+ assertTrue(last.compareTo(next)<=0);
+ else if (testType == TestType.POST_DISTINCT || testType == TestType.PRE_DISTINCT)
+ assertTrue(last.compareTo(next)<0);
last = next;
- */
- assertTrue("last should be <= next", lastT.compareTo(t) <= 0);
- lastT = t;
- count++;
- }
-
- assertTrue(bag.cardinality() == count);
-
- bag.clear();
-
-
- //Test pre distinct
-
- bag.distinct();
-
-
- for(int i = 0; i < numItems; i++) {
- Tuple t = new Tuple(1);
- //To get a lot of duplicates
- t.setField(0, r.nextInt(10));
- bag.add(t);
+ count++;
}
-
- it = bag.content();
- count = 0;
- //last= "";
- lastT = new Tuple();
- while(it.hasNext()) {
- Tuple t = (Tuple)it.next();
- /*
- String next = t.getAtomField(0).strval();
- assertTrue(last.compareTo(next)<0);
- last = next;
- */
- assertTrue("last should be <= next", lastT.compareTo(t) <= 0);
- lastT = t;
- count++;
- }
-
- assertTrue(bag.cardinality() == count);
-
- //Check if it gives the correct contents the second time around
- it = bag.content();
- count = 0;
- //last= "";
- lastT = new Tuple();
- while(it.hasNext()) {
- Tuple t = (Tuple)it.next();
- /*
- String next = t.getAtomField(0).strval();
- assertTrue(last.compareTo(next)<0);
- last = next;
- */
- assertTrue("last should be <= next", lastT.compareTo(t) <= 0);
- lastT = t;
- count++;
- }
-
assertTrue(bag.cardinality() == count);
- bag.clear();
+ if (testType != TestType.NONE)
+ assertTrue(bag.numNotifies >= count/DataBag.notifyInterval);
}
+ */
+
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Tue Jan 22 13:17:12 2008
@@ -15,50 +15,49 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.pig.test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.StringTokenizer;
-
-import org.junit.Test;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigServer;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.TextLoader;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.PigFile;
-
-import junit.framework.TestCase;
-
-public class TestEvalPipeline extends TestCase {
-
- String initString = "mapreduce";
-
-
- static public class MyBagFunction extends EvalFunc<DataBag>{
- @Override
- public void exec(Tuple input, DataBag output) throws IOException {
- output.add(new Tuple("a"));
- output.add(new Tuple("a"));
- output.add(new Tuple("a"));
-
- }
- }
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import org.junit.Test;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.TextLoader;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.PigFile;
+
+import junit.framework.TestCase;
+
+public class TestEvalPipeline extends TestCase {
+
+ String initString = "mapreduce";
+
+
+ static public class MyBagFunction extends EvalFunc<DataBag>{
+ @Override
+ public void exec(Tuple input, DataBag output) throws IOException {
+ output.add(new Tuple("a"));
+ output.add(new Tuple("a"));
+ output.add(new Tuple("a"));
+
+ }
+ }
+
private File createFile(String[] data) throws Exception{
File f = File.createTempFile("tmp", "");
@@ -84,232 +83,230 @@
assertEquals(iter.next().getAtomField(0).numval(), 0.0);
}
- }
-
- @Test
- public void testJoin() throws Exception{
- PigServer pigServer = new PigServer(initString);
-
- File f1 = createFile(new String[]{"a:1","b:1","a:1"});
- File f2 = createFile(new String[]{"b","b","a"});
-
- pigServer.registerQuery("a = load 'file:" + f1 + "' using " + PigStorage.class.getName() + "(':');");
- pigServer.registerQuery("b = load 'file:" + f2 + "';");
- pigServer.registerQuery("c = cogroup a by $0, b by $0;");
- pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);");
-
- Iterator<Tuple> iter = pigServer.openIterator("d");
- int count = 0;
- while(iter.hasNext()){
- Tuple t = iter.next();
- assertTrue(t.getAtomField(0).strval().equals(t.getAtomField(2).strval()));
- count++;
- }
- assertEquals(count, 4);
- }
-
- @Test
- public void testDriverMethod() throws Exception{
- PigServer pigServer = new PigServer(initString);
- File f = File.createTempFile("tmp", "");
- PrintWriter pw = new PrintWriter(f);
- pw.println("a");
- pw.println("a");
- pw.close();
- pigServer.registerQuery("a = foreach (load 'file:" + f + "') generate '1', flatten(" + MyBagFunction.class.getName() + "(*));");
- pigServer.registerQuery("b = foreach a generate $0, flatten($1);");
- Iterator<Tuple> iter = pigServer.openIterator("a");
- int count = 0;
- while(iter.hasNext()){
- Tuple t = iter.next();
- assertTrue(t.getAtomField(0).strval().equals("1"));
- assertTrue(t.getAtomField(1).strval().equals("a"));
- count++;
- }
- assertEquals(count, 6);
- f.delete();
- }
-
-
- /* Replaced by TestDataMap.java
- @Test
- public void testMapLookup() throws IOException{
- PigServer pigServer = new PigServer(initString);
- DataBag b = new DataBag();
- DataMap colors = new DataMap();
- colors.put("apple","red");
- colors.put("orange","orange");
-
- DataMap weights = new DataMap();
- weights.put("apple","0.1");
- weights.put("orange","0.3");
-
- Tuple t = new Tuple();
- t.appendField(colors);
- t.appendField(weights);
- b.add(t);
-
- String fileName = "file:"+File.createTempFile("tmp", "");
- PigFile f = new PigFile(fileName);
- f.store(b, new BinStorage(), pigServer.getPigContext());
-
-
- pigServer.registerQuery("a = load '" + fileName + "' using BinStorage();");
- pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');");
- Iterator<Tuple> iter = pigServer.openIterator("b");
- t = iter.next();
- assertEquals(t.getAtomField(0).strval(), "red");
- assertEquals(t.getAtomField(1).numval(), 0.3);
- assertFalse(iter.hasNext());
- }
- */
-
-
- static public class TitleNGrams extends EvalFunc<DataBag> {
-
- @Override
- public void exec(Tuple input, DataBag output) throws IOException {
- String str = input.getAtomField(0).strval();
-
- String title = str;
-
- if (title != null) {
- List<String> nGrams = makeNGrams(title);
-
- for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) {
- Tuple t = new Tuple(1);
- t.setField(0, it.next());
- output.add(t);
- }
- }
- }
-
-
- List<String> makeNGrams(String str) {
- List<String> tokens = new ArrayList<String>();
-
- StringTokenizer st = new StringTokenizer(str);
- while (st.hasMoreTokens())
- tokens.add(st.nextToken());
-
- return nGramHelper(tokens, new ArrayList<String>());
- }
-
- ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) {
- if (str.size() == 0)
- return nGrams;
-
- for (int i = 0; i < str.size(); i++)
- nGrams.add(makeString(str.subList(0, i+1)));
-
- return nGramHelper(str.subList(1, str.size()), nGrams);
- }
-
- String makeString(List<String> list) {
- StringBuffer sb = new StringBuffer();
- for (Iterator<String> it = list.iterator(); it.hasNext(); ) {
- sb.append(it.next());
- if (it.hasNext())
- sb.append(" ");
- }
- return sb.toString();
- }
- }
-
-
-
-
- @Test
- public void testBagFunctionWithFlattening() throws Exception{
- PigServer pigServer = new PigServer(initString);
- File queryLogFile = createFile(
- new String[]{
- "stanford\tdeer\tsighting",
- "bush\tpresident",
- "stanford\tbush",
- "conference\tyahoo",
- "world\tcup\tcricket",
- "bush\twins",
- "stanford\tpresident",
- }
- );
-
- File newsFile = createFile(
- new String[]{
- "deer seen at stanford",
- "george bush visits stanford",
- "yahoo hosting a conference in the bay area",
- "who will win the world cup"
- }
- );
-
- Map<String, Integer> expectedResults = new HashMap<String, Integer>();
- expectedResults.put("bush", 2);
- expectedResults.put("stanford", 3);
- expectedResults.put("world", 1);
- expectedResults.put("conference", 1);
-
- pigServer.registerQuery("newsArticles = LOAD 'file:" + newsFile + "' USING " + TextLoader.class.getName() + "();");
- pigServer.registerQuery("queryLog = LOAD 'file:" + queryLogFile + "';");
-
- pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
- pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;");
- pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
-
- Iterator<Tuple> iter = pigServer.openIterator("answer");
- while(iter.hasNext()){
- Tuple t = iter.next();
- assertEquals(expectedResults.get(t.getAtomField(1).strval()).doubleValue(),t.getAtomField(0).numval().doubleValue());
- }
- }
-
-
-
- @Test
- public void testSort() throws Exception{
- testSortDistinct(false);
- }
-
-
- @Test
- public void testDistinct() throws Exception{
- testSortDistinct(true);
- }
-
- private void testSortDistinct(boolean eliminateDuplicates) throws Exception{
- int LOOP_SIZE = 1024*16;
- File tmpFile = File.createTempFile("test", "txt");
- PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
- Random r = new Random();
- for(int i = 0; i < LOOP_SIZE; i++) {
- ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
- }
- ps.close();
-
- PigServer pig = new PigServer(initString);
- String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString();
- pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
- if (eliminateDuplicates){
- pig.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
- }else{
- pig.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
- }
- pig.store("B", tmpOutputFile);
-
- pig.registerQuery("A = load '" + tmpOutputFile + "';");
- Iterator<Tuple> iter = pig.openIterator("A");
- int last = -1;
- while (iter.hasNext()){
- Tuple t = iter.next();
- if (eliminateDuplicates){
- assertTrue(last < t.getAtomField(0).numval().intValue());
- }else{
- assertTrue(last <= t.getAtomField(0).numval().intValue());
- assertEquals(t.arity(), 2);
- }
- }
-
- }
-
-
-}
+ }
+
+ @Test
+ public void testJoin() throws Exception{
+ PigServer pigServer = new PigServer(initString);
+
+ File f1 = createFile(new String[]{"a:1","b:1","a:1"});
+ File f2 = createFile(new String[]{"b","b","a"});
+
+ pigServer.registerQuery("a = load 'file:" + f1 + "' using " + PigStorage.class.getName() + "(':');");
+ pigServer.registerQuery("b = load 'file:" + f2 + "';");
+ pigServer.registerQuery("c = cogroup a by $0, b by $0;");
+ pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);");
+
+ Iterator<Tuple> iter = pigServer.openIterator("d");
+ int count = 0;
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ assertTrue(t.getAtomField(0).strval().equals(t.getAtomField(2).strval()));
+ count++;
+ }
+ assertEquals(count, 4);
+ }
+
+ @Test
+ public void testDriverMethod() throws Exception{
+ PigServer pigServer = new PigServer(initString);
+ File f = File.createTempFile("tmp", "");
+ PrintWriter pw = new PrintWriter(f);
+ pw.println("a");
+ pw.println("a");
+ pw.close();
+ pigServer.registerQuery("a = foreach (load 'file:" + f + "') generate '1', flatten(" + MyBagFunction.class.getName() + "(*));");
+ pigServer.registerQuery("b = foreach a generate $0, flatten($1);");
+ Iterator<Tuple> iter = pigServer.openIterator("a");
+ int count = 0;
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ assertTrue(t.getAtomField(0).strval().equals("1"));
+ assertTrue(t.getAtomField(1).strval().equals("a"));
+ count++;
+ }
+ assertEquals(count, 6);
+ f.delete();
+ }
+
+
+ @Test
+ public void testMapLookup() throws IOException{
+ PigServer pigServer = new PigServer(initString);
+ DataBag b = BagFactory.getInstance().newDefaultBag();
+ DataMap colors = new DataMap();
+ colors.put("apple","red");
+ colors.put("orange","orange");
+
+ DataMap weights = new DataMap();
+ weights.put("apple","0.1");
+ weights.put("orange","0.3");
+
+ Tuple t = new Tuple();
+ t.appendField(colors);
+ t.appendField(weights);
+ b.add(t);
+
+ String fileName = "file:"+File.createTempFile("tmp", "");
+ PigFile f = new PigFile(fileName);
+ f.store(b, new BinStorage(), pigServer.getPigContext());
+
+
+ pigServer.registerQuery("a = load '" + fileName + "' using BinStorage();");
+ pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');");
+ Iterator<Tuple> iter = pigServer.openIterator("b");
+ t = iter.next();
+ assertEquals(t.getAtomField(0).strval(), "red");
+ assertEquals(t.getAtomField(1).numval(), 0.3);
+ assertFalse(iter.hasNext());
+ }
+
+
+ static public class TitleNGrams extends EvalFunc<DataBag> {
+
+ @Override
+ public void exec(Tuple input, DataBag output) throws IOException {
+ String str = input.getAtomField(0).strval();
+
+ String title = str;
+
+ if (title != null) {
+ List<String> nGrams = makeNGrams(title);
+
+ for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) {
+ Tuple t = new Tuple(1);
+ t.setField(0, it.next());
+ output.add(t);
+ }
+ }
+ }
+
+
+ List<String> makeNGrams(String str) {
+ List<String> tokens = new ArrayList<String>();
+
+ StringTokenizer st = new StringTokenizer(str);
+ while (st.hasMoreTokens())
+ tokens.add(st.nextToken());
+
+ return nGramHelper(tokens, new ArrayList<String>());
+ }
+
+ ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) {
+ if (str.size() == 0)
+ return nGrams;
+
+ for (int i = 0; i < str.size(); i++)
+ nGrams.add(makeString(str.subList(0, i+1)));
+
+ return nGramHelper(str.subList(1, str.size()), nGrams);
+ }
+
+ String makeString(List<String> list) {
+ StringBuffer sb = new StringBuffer();
+ for (Iterator<String> it = list.iterator(); it.hasNext(); ) {
+ sb.append(it.next());
+ if (it.hasNext())
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+ }
+
+
+
+
+ @Test
+ public void testBagFunctionWithFlattening() throws Exception{
+ PigServer pigServer = new PigServer(initString);
+ File queryLogFile = createFile(
+ new String[]{
+ "stanford\tdeer\tsighting",
+ "bush\tpresident",
+ "stanford\tbush",
+ "conference\tyahoo",
+ "world\tcup\tcricket",
+ "bush\twins",
+ "stanford\tpresident",
+ }
+ );
+
+ File newsFile = createFile(
+ new String[]{
+ "deer seen at stanford",
+ "george bush visits stanford",
+ "yahoo hosting a conference in the bay area",
+ "who will win the world cup"
+ }
+ );
+
+ Map<String, Integer> expectedResults = new HashMap<String, Integer>();
+ expectedResults.put("bush", 2);
+ expectedResults.put("stanford", 3);
+ expectedResults.put("world", 1);
+ expectedResults.put("conference", 1);
+
+ pigServer.registerQuery("newsArticles = LOAD 'file:" + newsFile + "' USING " + TextLoader.class.getName() + "();");
+ pigServer.registerQuery("queryLog = LOAD 'file:" + queryLogFile + "';");
+
+ pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
+ pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;");
+ pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("answer");
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ assertEquals(expectedResults.get(t.getAtomField(1).strval()).doubleValue(),t.getAtomField(0).numval().doubleValue());
+ }
+ }
+
+
+
+ @Test
+ public void testSort() throws Exception{
+ testSortDistinct(false);
+ }
+
+
+ @Test
+ public void testDistinct() throws Exception{
+ testSortDistinct(true);
+ }
+
+ private void testSortDistinct(boolean eliminateDuplicates) throws Exception{
+ int LOOP_SIZE = 1024*16;
+ File tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ Random r = new Random();
+ for(int i = 0; i < LOOP_SIZE; i++) {
+ ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
+ }
+ ps.close();
+
+ PigServer pig = new PigServer(initString);
+ String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString();
+ pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+ if (eliminateDuplicates){
+ pig.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
+ }else{
+ pig.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
+ }
+ pig.store("B", tmpOutputFile);
+
+ pig.registerQuery("A = load '" + tmpOutputFile + "';");
+ Iterator<Tuple> iter = pig.openIterator("A");
+ int last = -1;
+ while (iter.hasNext()){
+ Tuple t = iter.next();
+ if (eliminateDuplicates){
+ assertTrue(last < t.getAtomField(0).numval().intValue());
+ }else{
+ assertTrue(last <= t.getAtomField(0).numval().intValue());
+ assertEquals(t.arity(), 2);
+ }
+ }
+
+ }
+
+
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java Tue Jan 22 13:17:12 2008
@@ -78,9 +78,9 @@
}
@Override
public void exec(Tuple input, DataBag output) throws IOException {
- Iterator<Datum> it = (input.getBagField(0)).content();
+ Iterator<Tuple> it = (input.getBagField(0)).iterator();
while(it.hasNext()) {
- Tuple t = (Tuple)it.next();
+ Tuple t = it.next();
Tuple newT = new Tuple(2);
newT.setField(0, field0);
newT.setField(1, t.getField(0).toString());
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java Tue Jan 22 13:17:12 2008
@@ -33,18 +33,14 @@
import org.apache.pig.PigServer;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.DataAtom;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
import org.apache.pig.PigServer.ExecType;
import org.apache.pig.impl.io.PigFile;
import org.apache.pig.impl.PigContext;
public class TestPigFile extends TestCase {
- DataBag bag = new DataBag(Datum.DataType.TUPLE);
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
Random rand = new Random();
@Override
@@ -89,13 +85,13 @@
DataBag loaded = load.load(new PigStorage(), pigContext);
System.out.println("Done.");
- assertTrue(bag.cardinality() == loaded.cardinality());
+ assertTrue(bag.size() == loaded.size());
- Iterator<Datum> it1 = bag.content();
- Iterator<Datum> it2 = loaded.content();
+ Iterator<Tuple> it1 = bag.iterator();
+ Iterator<Tuple> it2 = loaded.iterator();
while (it1.hasNext() && it2.hasNext()) {
- Tuple f1 = (Tuple)it1.next();
- Tuple f2 = (Tuple)it2.next();
+ Tuple f1 = it1.next();
+ Tuple f2 = it2.next();
assertTrue(f1.equals(f2));
}
assertFalse(it1.hasNext() || it2.hasNext());
@@ -131,7 +127,7 @@
private DataBag getRandomBag(int maxCardinality, int nestingLevel) throws IOException{
int cardinality = rand.nextInt(maxCardinality)+1;
- DataBag b = new DataBag(Datum.DataType.TUPLE);
+ DataBag b = BagFactory.getInstance().newDefaultBag();
for (int i=0; i<cardinality; i++){
Tuple t = getRandomTuple(nestingLevel+1);
b.add(t);
@@ -168,14 +164,13 @@
DataBag loaded = load.load(new BinStorage(), pigContext);
System.out.println("Done.");
- assertTrue(bag.cardinality() == loaded.cardinality());
+ assertTrue(bag.size() == loaded.size());
- Iterator<Datum> it1 = bag.content();
- Iterator<Datum> it2 = loaded.content();
- //while (it1.hasNext() && it2.hasNext()) {
- for (int i = 0; it1.hasNext() && it2.hasNext(); i++) {
- Tuple f1 = (Tuple)it1.next();
- Tuple f2 = (Tuple)it2.next();
+ Iterator<Tuple> it1 = bag.iterator();
+ Iterator<Tuple> it2 = loaded.iterator();
+ while (it1.hasNext() && it2.hasNext()) {
+ Tuple f1 = it1.next();
+ Tuple f2 = it2.next();
assertTrue(f1.equals(f2));
}
assertFalse(it1.hasNext() || it2.hasNext());
Modified: incubator/pig/branches/types/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/Util.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/Util.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/Util.java Tue Jan 22 13:17:12 2008
@@ -19,8 +19,7 @@
import java.io.IOException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
import org.apache.pig.data.Datum;
import org.apache.pig.data.DataAtom;
@@ -29,7 +28,7 @@
// =================
static public Tuple loadFlatTuple(Tuple t, int[] input) throws IOException {
for (int i = 0; i < input.length; i++) {
- t.setField(i, new DataAtom(new Integer(input[i]).toString()));
+ t.setField(i, input[i]);
}
return t;
}
@@ -42,7 +41,7 @@
}
static public Tuple loadNestTuple(Tuple t, int[] input) throws IOException {
- DataBag bag = new DataBag(Datum.DataType.TUPLE);
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
for(int i = 0; i < input.length; i++) {
Tuple f = new Tuple(1);
f.setField(0, input[i]);
@@ -54,7 +53,7 @@
static public Tuple loadNestTuple(Tuple t, int[][] input) throws IOException {
for (int i = 0; i < input.length; i++) {
- DataBag bag = new DataBag(Datum.DataType.TUPLE);
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
Tuple f = loadFlatTuple(new Tuple(input[i].length), input[i]);
bag.add(f);
t.setField(i, bag);
@@ -64,7 +63,7 @@
static public Tuple loadTuple(Tuple t, String[][] input) throws IOException {
for (int i = 0; i < input.length; i++) {
- DataBag bag = new DataBag(Datum.DataType.TUPLE);
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
Tuple f = loadTuple(new Tuple(input[i].length), input[i]);
bag.add(f);
t.setField(i, bag);