You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/01/22 22:17:22 UTC

svn commit: r614325 [6/6] - in /incubator/pig/branches/types: ./ lib/ scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apac...

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java Tue Jan 22 13:17:12 2008
@@ -19,7 +19,6 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
@@ -73,7 +72,6 @@
         assertFalse(da1.compareTo(da2) > 0);
     }
 
-	/* Replaced by TestTuple.java
     @Test
     public void testTuple() throws Exception {
         int arity = 5;
@@ -160,9 +158,8 @@
         n1.appendTuple(n2);
         assertTrue(n1.arity() == n1Arity + n2Arity);
     }
-	*/
 
-	/* Replaced by TestDataBag.java 
+    /*
     @Test
     public void testDataBag() throws Exception {
         int[] input1 = { 1, 2, 3, 4, 5 };
@@ -197,8 +194,19 @@
             caught = true;
         }   
         assertTrue(caught);
+
+        // check that notifications are sent
+         b.clear();
+         DataBag.notifyInterval = 2;
+         Tuple g = Util.loadFlatTuple(new Tuple(input1.length), input1);
+         for (int i = 0; i < 10; i++) {
+             b.add(g);
+         }
+
+         Iterator it = b.content();
+         while (it.hasNext()) it.next();
+         assert(b.numNotifies == 5);
     }
-	*/
 
     @Test
     
@@ -208,198 +216,81 @@
 
     public void testBigDataBagOnDisk() throws Exception{
     	Runtime.getRuntime().gc();
-    	//testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 10000);
+    	testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 1000000);
     }
+    */
 
+    private enum TestType {
+    	PRE_SORT,
+    	POST_SORT,
+    	PRE_DISTINCT,
+    	POST_DISTINCT,
+    	NONE
+    }
+       
     
+    /*
     private void testBigDataBag(long freeMemoryToMaintain, int numItems) throws Exception {
     	BigDataBag.FREE_MEMORY_TO_MAINTAIN = freeMemoryToMaintain;
-    	File tmp = File.createTempFile("test", "bag").getParentFile();
-        BigDataBag bag = new BigDataBag(Datum.DataType.TUPLE, tmp);
-        Iterator<Datum> it;
-        int count;
-        //String last;
-		Tuple lastT = null;
-    
         Random r = new Random();
-        
-        
-        //Basic test
-        assertTrue(bag.isEmpty());
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(2);
-            t.setField(0, Integer.toHexString(i));
-            t.setField(1, i);
-            bag.add(t);
-        }
-        
-        assertFalse(bag.isEmpty());
-
-        assertTrue(bag.cardinality() == numItems);
-        
-        int lastI = -1;
-        it = bag.content();
-        count = 0;
-        while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-            int ix = Integer.parseInt(t.getAtomField(0).strval(), 16);
-            assertTrue(Integer.toString(ix).equals(t.getAtomField(1).strval()));
-            assertEquals(lastI+1, ix);
-            lastI = ix;
-            count++;
-        }
-        
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-
-        //Test pre sort
-        
-        bag.sort();
-        
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            t.setField(0, r.nextInt(100000));
-            bag.add(t);
-        }
-        
-        it = bag.content();
-        count = 0;
-        // last= "";
-		lastT = new Tuple();
-        while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-			/*
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<=0);
-            last = next;
-			*/
-			assertTrue("last should be <= next", lastT.compareTo(t) <= 0);
-			lastT = t;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-
-
-        //Test post sort
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            t.setField(0, r.nextInt(100000));
-            bag.add(t);
-        }
-        
-        bag.sort();
+   
+    	for (TestType testType: TestType.values()){
+    		BigDataBag bag = BagFactory.getInstance().getNewBigBag();
+
+            assertTrue(bag.isEmpty());
+
+            if (testType == TestType.PRE_SORT)
+            	bag.sort();
+            else if (testType == TestType.PRE_DISTINCT)
+            	bag.distinct();
+            
+            //generate data and add it to the bag
+            for(int i = 0; i < numItems; i++) {
+                Tuple t = new Tuple(1);
+                t.setField(0, r.nextInt(numItems));
+                bag.add(t);
+            }
+
+            assertFalse(bag.isEmpty());
+
+            if (testType == TestType.POST_SORT)
+            	bag.sort();
+            else if (testType == TestType.POST_DISTINCT)
+            	bag.distinct();
+
+            
+            if (testType == TestType.NONE)
+            	assertTrue(bag.cardinality() == numItems);
+            checkContents(bag, numItems, testType);
+            checkContents(bag, numItems, testType);
 
-        it = bag.content();
-        count = 0;
-        //last= "";
-		lastT = new Tuple();
-        while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-			/*
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<=0);
-            last = next;
-			*/
-			assertTrue("last should be <= next", lastT.compareTo(t) <= 0);
-			lastT = t;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-		
-        //test post-distinct
-        
-       
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            //To get a lot of duplicates
-            t.setField(0, r.nextInt(1000));
-            bag.add(t);
-        }
+    	}
+    }
+     
+    
+    private void checkContents(DataBag bag, int numItems, TestType testType) throws Exception{
+        String last = "";
         
+        DataBag.notifyInterval = 100;
         
-        bag.distinct();
-
-        it = bag.content();
-        count = 0;
-        //last= "";
-		lastT = new Tuple();
+        Iterator<Tuple> it = bag.content();
+        int count = 0;
         while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-			/*
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
+        	Tuple t = it.next();
+        	String next = t.getAtomField(0).strval();
+        	if (testType == TestType.POST_SORT || testType == TestType.PRE_SORT)
+                assertTrue(last.compareTo(next)<=0);
+        	else if (testType == TestType.POST_DISTINCT || testType == TestType.PRE_DISTINCT)
+                assertTrue(last.compareTo(next)<0);
             last = next;
-			*/
-			assertTrue("last should be <= next", lastT.compareTo(t) <= 0);
-			lastT = t;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-		
-        
-        //Test pre distinct
-
-        bag.distinct();
-
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            //To get a lot of duplicates
-            t.setField(0, r.nextInt(10));
-            bag.add(t);
+        	count++;
         }
         
-
-        it = bag.content();
-        count = 0;
-        //last= "";
-		lastT = new Tuple();
-        while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-			/*
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
-            last = next;
-			*/
-			assertTrue("last should be <= next", lastT.compareTo(t) <= 0);
-			lastT = t;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-
-        //Check if it gives the correct contents the second time around
-        it = bag.content();
-        count = 0;
-        //last= "";
-		lastT = new Tuple();
-        while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-			/*
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
-            last = next;
-			*/
-			assertTrue("last should be <= next", lastT.compareTo(t) <= 0);
-			lastT = t;
-            count++;
-        }
-
         assertTrue(bag.cardinality() == count);
         
-        bag.clear();
+        if (testType != TestType.NONE)
+        	assertTrue(bag.numNotifies >= count/DataBag.notifyInterval);
     }
+    */
+
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Tue Jan 22 13:17:12 2008
@@ -15,50 +15,49 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.StringTokenizer;
-
-import org.junit.Test;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigServer;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.TextLoader;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.PigFile;
-
-import junit.framework.TestCase;
-
-public class TestEvalPipeline extends TestCase {
-	
-	String initString = "mapreduce";
-	
-	
-	static public class MyBagFunction extends EvalFunc<DataBag>{
-		@Override
-		public void exec(Tuple input, DataBag output) throws IOException {
-			output.add(new Tuple("a"));
-			output.add(new Tuple("a"));
-			output.add(new Tuple("a"));
-			
-		}
-	}
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import org.junit.Test;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.TextLoader;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.PigFile;
+
+import junit.framework.TestCase;
+
+public class TestEvalPipeline extends TestCase {
+	
+	String initString = "mapreduce";
+	
+	
+	static public class MyBagFunction extends EvalFunc<DataBag>{
+		@Override
+		public void exec(Tuple input, DataBag output) throws IOException {
+			output.add(new Tuple("a"));
+			output.add(new Tuple("a"));
+			output.add(new Tuple("a"));
+			
+		}
+	}
+	
 	
 	private File createFile(String[] data) throws Exception{
 		File f = File.createTempFile("tmp", "");
@@ -84,232 +83,230 @@
 			assertEquals(iter.next().getAtomField(0).numval(), 0.0);
 		}
 		
-	}
-	
-	@Test
-	public void testJoin() throws Exception{
-		PigServer pigServer = new PigServer(initString);
-		
-		File f1 = createFile(new String[]{"a:1","b:1","a:1"});
-		File f2 = createFile(new String[]{"b","b","a"});
-		
-		pigServer.registerQuery("a = load 'file:" + f1 + "' using " + PigStorage.class.getName() + "(':');");
-		pigServer.registerQuery("b = load 'file:" + f2 + "';");
-		pigServer.registerQuery("c = cogroup a by $0, b by $0;");		
-		pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);");
-		
-		Iterator<Tuple> iter = pigServer.openIterator("d");
-		int count = 0;
-		while(iter.hasNext()){
-			Tuple t = iter.next();
-			assertTrue(t.getAtomField(0).strval().equals(t.getAtomField(2).strval()));
-			count++;
-		}
-		assertEquals(count, 4);
-	}
-	
-	@Test
-	public void testDriverMethod() throws Exception{
-		PigServer pigServer = new PigServer(initString);
-		File f = File.createTempFile("tmp", "");
-		PrintWriter pw = new PrintWriter(f);
-		pw.println("a");
-		pw.println("a");
-		pw.close();
-		pigServer.registerQuery("a = foreach (load 'file:" + f + "') generate '1', flatten(" + MyBagFunction.class.getName() + "(*));");
-		pigServer.registerQuery("b = foreach a generate $0, flatten($1);");
-		Iterator<Tuple> iter = pigServer.openIterator("a");
-		int count = 0;
-		while(iter.hasNext()){
-			Tuple t = iter.next();
-			assertTrue(t.getAtomField(0).strval().equals("1"));
-			assertTrue(t.getAtomField(1).strval().equals("a"));
-			count++;
-		}
-		assertEquals(count, 6);
-		f.delete();
-	}
-	
-	
-	/* Replaced by TestDataMap.java
-	@Test
-	public void testMapLookup() throws IOException{
-		PigServer pigServer = new PigServer(initString);
-		DataBag b = new DataBag();
-		DataMap colors = new DataMap();
-		colors.put("apple","red");
-		colors.put("orange","orange");
-		
-		DataMap weights = new DataMap();
-		weights.put("apple","0.1");
-		weights.put("orange","0.3");
-		
-		Tuple t = new Tuple();
-		t.appendField(colors);
-		t.appendField(weights);
-		b.add(t);
-		
-		String fileName = "file:"+File.createTempFile("tmp", "");
-		PigFile f = new PigFile(fileName);
-		f.store(b, new BinStorage(), pigServer.getPigContext());
-		
-		
-		pigServer.registerQuery("a = load '" + fileName + "' using BinStorage();");
-		pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');");
-		Iterator<Tuple> iter = pigServer.openIterator("b");
-		t = iter.next();
-		assertEquals(t.getAtomField(0).strval(), "red");
-		assertEquals(t.getAtomField(1).numval(), 0.3);
-		assertFalse(iter.hasNext());
-	}
-	*/
-	
-	
-	static public class TitleNGrams extends EvalFunc<DataBag> {
-		
-		@Override
-		public void exec(Tuple input, DataBag output) throws IOException {	
-		    String str = input.getAtomField(0).strval();
-			
-			String title = str;
-
-			if (title != null) {
-				List<String> nGrams = makeNGrams(title);
-				
-				for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) {
-					Tuple t = new Tuple(1);
-					t.setField(0, it.next());
-					output.add(t);
-				}
-			}
-	    }
-		
-		
-		List<String> makeNGrams(String str) {
-			List<String> tokens = new ArrayList<String>();
-			
-			StringTokenizer st = new StringTokenizer(str);
-			while (st.hasMoreTokens())
-				tokens.add(st.nextToken());
-			
-			return nGramHelper(tokens, new ArrayList<String>());
-		}
-		
-		ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) {
-			if (str.size() == 0)
-				return nGrams;
-			
-			for (int i = 0; i < str.size(); i++)
-				nGrams.add(makeString(str.subList(0, i+1)));
-			
-			return nGramHelper(str.subList(1, str.size()), nGrams);
-		}
-		
-		String makeString(List<String> list) {
-			StringBuffer sb = new StringBuffer();
-			for (Iterator<String> it = list.iterator(); it.hasNext(); ) {
-				sb.append(it.next());
-				if (it.hasNext())
-					sb.append(" ");
-			}
-			return sb.toString();
-		}
-	}
-
-	
-	
-	
-	@Test
-	public void testBagFunctionWithFlattening() throws Exception{
-		PigServer pigServer = new PigServer(initString);
-		File queryLogFile = createFile(
-					new String[]{ 
-						"stanford\tdeer\tsighting",
-						"bush\tpresident",
-						"stanford\tbush",
-						"conference\tyahoo",
-						"world\tcup\tcricket",
-						"bush\twins",
-						"stanford\tpresident",
-					}
-				);
-				
-		File newsFile = createFile(
-					new String[]{
-						"deer seen at stanford",
-						"george bush visits stanford", 
-						"yahoo hosting a conference in the bay area", 
-						"who will win the world cup"
-					}
-				);	
-		
-		Map<String, Integer> expectedResults = new HashMap<String, Integer>();
-		expectedResults.put("bush", 2);
-		expectedResults.put("stanford", 3);
-		expectedResults.put("world", 1);
-		expectedResults.put("conference", 1);
-		
-		pigServer.registerQuery("newsArticles = LOAD 'file:" + newsFile + "' USING " + TextLoader.class.getName() + "();");
-	    pigServer.registerQuery("queryLog = LOAD 'file:" + queryLogFile + "';");
-
-	    pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
-	    pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;");
-	    pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
-		
-	    Iterator<Tuple> iter = pigServer.openIterator("answer");
-	    while(iter.hasNext()){
-	    	Tuple t = iter.next();
-	    	assertEquals(expectedResults.get(t.getAtomField(1).strval()).doubleValue(),t.getAtomField(0).numval().doubleValue());
-	    }
-	}
-	
-
-	
-	@Test
-	public void testSort() throws Exception{
-		testSortDistinct(false);
-	}
-	
-
-	@Test
-	public void testDistinct() throws Exception{
-		testSortDistinct(true);
-	}
-
-	private void testSortDistinct(boolean eliminateDuplicates) throws Exception{
-		int LOOP_SIZE = 1024*16;
-		File tmpFile = File.createTempFile("test", "txt");
-        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
-        Random r = new Random();
-        for(int i = 0; i < LOOP_SIZE; i++) {
-            ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
-        }
-        ps.close(); 
-		
-		PigServer pig = new PigServer(initString);
-        String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString();
-		pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
-		if (eliminateDuplicates){
-			pig.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
-		}else{
-			pig.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
-		}
-		pig.store("B", tmpOutputFile);
-		
-		pig.registerQuery("A = load '" + tmpOutputFile + "';");
-		Iterator<Tuple> iter = pig.openIterator("A");
-		int last = -1;
-		while (iter.hasNext()){
-			Tuple t = iter.next();
-			if (eliminateDuplicates){
-				assertTrue(last < t.getAtomField(0).numval().intValue());
-			}else{
-				assertTrue(last <= t.getAtomField(0).numval().intValue());
-				assertEquals(t.arity(), 2);
-			}
-		}
-		
-	}
-	
-
-}
+	}
+	
+	@Test
+	public void testJoin() throws Exception{
+		PigServer pigServer = new PigServer(initString);
+		
+		File f1 = createFile(new String[]{"a:1","b:1","a:1"});
+		File f2 = createFile(new String[]{"b","b","a"});
+		
+		pigServer.registerQuery("a = load 'file:" + f1 + "' using " + PigStorage.class.getName() + "(':');");
+		pigServer.registerQuery("b = load 'file:" + f2 + "';");
+		pigServer.registerQuery("c = cogroup a by $0, b by $0;");		
+		pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);");
+		
+		Iterator<Tuple> iter = pigServer.openIterator("d");
+		int count = 0;
+		while(iter.hasNext()){
+			Tuple t = iter.next();
+			assertTrue(t.getAtomField(0).strval().equals(t.getAtomField(2).strval()));
+			count++;
+		}
+		assertEquals(count, 4);
+	}
+	
+	@Test
+	public void testDriverMethod() throws Exception{
+		PigServer pigServer = new PigServer(initString);
+		File f = File.createTempFile("tmp", "");
+		PrintWriter pw = new PrintWriter(f);
+		pw.println("a");
+		pw.println("a");
+		pw.close();
+		pigServer.registerQuery("a = foreach (load 'file:" + f + "') generate '1', flatten(" + MyBagFunction.class.getName() + "(*));");
+		pigServer.registerQuery("b = foreach a generate $0, flatten($1);");
+		Iterator<Tuple> iter = pigServer.openIterator("a");
+		int count = 0;
+		while(iter.hasNext()){
+			Tuple t = iter.next();
+			assertTrue(t.getAtomField(0).strval().equals("1"));
+			assertTrue(t.getAtomField(1).strval().equals("a"));
+			count++;
+		}
+		assertEquals(count, 6);
+		f.delete();
+	}
+	
+	
+	@Test
+	public void testMapLookup() throws IOException{
+		PigServer pigServer = new PigServer(initString);
+		DataBag b = BagFactory.getInstance().newDefaultBag();
+		DataMap colors = new DataMap();
+		colors.put("apple","red");
+		colors.put("orange","orange");
+		
+		DataMap weights = new DataMap();
+		weights.put("apple","0.1");
+		weights.put("orange","0.3");
+		
+		Tuple t = new Tuple();
+		t.appendField(colors);
+		t.appendField(weights);
+		b.add(t);
+		
+		String fileName = "file:"+File.createTempFile("tmp", "");
+		PigFile f = new PigFile(fileName);
+		f.store(b, new BinStorage(), pigServer.getPigContext());
+		
+		
+		pigServer.registerQuery("a = load '" + fileName + "' using BinStorage();");
+		pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');");
+		Iterator<Tuple> iter = pigServer.openIterator("b");
+		t = iter.next();
+		assertEquals(t.getAtomField(0).strval(), "red");
+		assertEquals(t.getAtomField(1).numval(), 0.3);
+		assertFalse(iter.hasNext());
+	}
+	
+	
+	static public class TitleNGrams extends EvalFunc<DataBag> {
+		
+		@Override
+		public void exec(Tuple input, DataBag output) throws IOException {	
+		    String str = input.getAtomField(0).strval();
+			
+			String title = str;
+
+			if (title != null) {
+				List<String> nGrams = makeNGrams(title);
+				
+				for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) {
+					Tuple t = new Tuple(1);
+					t.setField(0, it.next());
+					output.add(t);
+				}
+			}
+	    }
+		
+		
+		List<String> makeNGrams(String str) {
+			List<String> tokens = new ArrayList<String>();
+			
+			StringTokenizer st = new StringTokenizer(str);
+			while (st.hasMoreTokens())
+				tokens.add(st.nextToken());
+			
+			return nGramHelper(tokens, new ArrayList<String>());
+		}
+		
+		ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) {
+			if (str.size() == 0)
+				return nGrams;
+			
+			for (int i = 0; i < str.size(); i++)
+				nGrams.add(makeString(str.subList(0, i+1)));
+			
+			return nGramHelper(str.subList(1, str.size()), nGrams);
+		}
+		
+		String makeString(List<String> list) {
+			StringBuffer sb = new StringBuffer();
+			for (Iterator<String> it = list.iterator(); it.hasNext(); ) {
+				sb.append(it.next());
+				if (it.hasNext())
+					sb.append(" ");
+			}
+			return sb.toString();
+		}
+	}
+
+	
+	
+	
+	@Test
+	public void testBagFunctionWithFlattening() throws Exception{
+		PigServer pigServer = new PigServer(initString);
+		File queryLogFile = createFile(
+					new String[]{ 
+						"stanford\tdeer\tsighting",
+						"bush\tpresident",
+						"stanford\tbush",
+						"conference\tyahoo",
+						"world\tcup\tcricket",
+						"bush\twins",
+						"stanford\tpresident",
+					}
+				);
+				
+		File newsFile = createFile(
+					new String[]{
+						"deer seen at stanford",
+						"george bush visits stanford", 
+						"yahoo hosting a conference in the bay area", 
+						"who will win the world cup"
+					}
+				);	
+		
+		Map<String, Integer> expectedResults = new HashMap<String, Integer>();
+		expectedResults.put("bush", 2);
+		expectedResults.put("stanford", 3);
+		expectedResults.put("world", 1);
+		expectedResults.put("conference", 1);
+		
+		pigServer.registerQuery("newsArticles = LOAD 'file:" + newsFile + "' USING " + TextLoader.class.getName() + "();");
+	    pigServer.registerQuery("queryLog = LOAD 'file:" + queryLogFile + "';");
+
+	    pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
+	    pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;");
+	    pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
+		
+	    Iterator<Tuple> iter = pigServer.openIterator("answer");
+	    while(iter.hasNext()){
+	    	Tuple t = iter.next();
+	    	assertEquals(expectedResults.get(t.getAtomField(1).strval()).doubleValue(),t.getAtomField(0).numval().doubleValue());
+	    }
+	}
+	
+
+	
+	@Test
+	public void testSort() throws Exception{
+		testSortDistinct(false);
+	}
+	
+
+	@Test
+	public void testDistinct() throws Exception{
+		testSortDistinct(true);
+	}
+
+	private void testSortDistinct(boolean eliminateDuplicates) throws Exception{
+		int LOOP_SIZE = 1024*16;
+		File tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        Random r = new Random();
+        for(int i = 0; i < LOOP_SIZE; i++) {
+            ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
+        }
+        ps.close(); 
+		
+		PigServer pig = new PigServer(initString);
+        String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString();
+		pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+		if (eliminateDuplicates){
+			pig.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
+		}else{
+			pig.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
+		}
+		pig.store("B", tmpOutputFile);
+		
+		pig.registerQuery("A = load '" + tmpOutputFile + "';");
+		Iterator<Tuple> iter = pig.openIterator("A");
+		int last = -1;
+		while (iter.hasNext()){
+			Tuple t = iter.next();
+			if (eliminateDuplicates){
+				assertTrue(last < t.getAtomField(0).numval().intValue());
+			}else{
+				assertTrue(last <= t.getAtomField(0).numval().intValue());
+				assertEquals(t.arity(), 2);
+			}
+		}
+		
+	}
+	
+
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java Tue Jan 22 13:17:12 2008
@@ -78,9 +78,9 @@
     	}
         @Override
 		public void exec(Tuple input, DataBag output) throws IOException {
-            Iterator<Datum> it = (input.getBagField(0)).content();
+            Iterator<Tuple> it = (input.getBagField(0)).iterator();
             while(it.hasNext()) {
-                Tuple t = (Tuple)it.next();
+                Tuple t = it.next();
                 Tuple newT = new Tuple(2);
                 newT.setField(0, field0);
                 newT.setField(1, t.getField(0).toString());

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java Tue Jan 22 13:17:12 2008
@@ -33,18 +33,14 @@
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.DataAtom;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
 import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.impl.io.PigFile;
 import org.apache.pig.impl.PigContext;
 
 public class TestPigFile extends TestCase {
 
-    DataBag bag          = new DataBag(Datum.DataType.TUPLE);
+    DataBag bag          = BagFactory.getInstance().newDefaultBag();
     Random rand = new Random();
     
     @Override
@@ -89,13 +85,13 @@
         DataBag loaded = load.load(new PigStorage(), pigContext);
         System.out.println("Done.");
 
-        assertTrue(bag.cardinality() == loaded.cardinality());
+        assertTrue(bag.size() == loaded.size());
 
-        Iterator<Datum> it1 = bag.content();
-        Iterator<Datum> it2 = loaded.content();
+        Iterator<Tuple> it1 = bag.iterator();
+        Iterator<Tuple> it2 = loaded.iterator();
         while (it1.hasNext() && it2.hasNext()) {
-            Tuple f1 = (Tuple)it1.next();
-            Tuple f2 = (Tuple)it2.next();
+            Tuple f1 = it1.next();
+            Tuple f2 = it2.next();
             assertTrue(f1.equals(f2));
         }
         assertFalse(it1.hasNext() || it2.hasNext());
@@ -131,7 +127,7 @@
     
     private DataBag getRandomBag(int maxCardinality, int nestingLevel) throws IOException{
     	int cardinality = rand.nextInt(maxCardinality)+1;
-    	DataBag b = new DataBag(Datum.DataType.TUPLE);
+    	DataBag b = BagFactory.getInstance().newDefaultBag();
     	for (int i=0; i<cardinality; i++){
     		Tuple t = getRandomTuple(nestingLevel+1); 
     		b.add(t);
@@ -168,14 +164,13 @@
         DataBag loaded = load.load(new BinStorage(), pigContext);
         System.out.println("Done.");
 
-        assertTrue(bag.cardinality() == loaded.cardinality());
+        assertTrue(bag.size() == loaded.size());
 
-        Iterator<Datum> it1 = bag.content();
-        Iterator<Datum> it2 = loaded.content();
-        //while (it1.hasNext() && it2.hasNext()) {
-        for (int i = 0; it1.hasNext() && it2.hasNext(); i++) {
-            Tuple f1 = (Tuple)it1.next();
-            Tuple f2 = (Tuple)it2.next();
+        Iterator<Tuple> it1 = bag.iterator();
+        Iterator<Tuple> it2 = loaded.iterator();
+        while (it1.hasNext() && it2.hasNext()) {
+            Tuple f1 = it1.next();
+            Tuple f2 = it2.next();
             assertTrue(f1.equals(f2));
         }
         assertFalse(it1.hasNext() || it2.hasNext());

Modified: incubator/pig/branches/types/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/Util.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/Util.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/Util.java Tue Jan 22 13:17:12 2008
@@ -19,8 +19,7 @@
 
 import java.io.IOException;
 
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
 import org.apache.pig.data.Datum;
 import org.apache.pig.data.DataAtom;
 
@@ -29,7 +28,7 @@
     // =================
     static public Tuple loadFlatTuple(Tuple t, int[] input) throws IOException {
         for (int i = 0; i < input.length; i++) {
-            t.setField(i, new DataAtom(new Integer(input[i]).toString()));
+            t.setField(i, input[i]);
         }
         return t;
     }
@@ -42,7 +41,7 @@
     }
 
     static public Tuple loadNestTuple(Tuple t, int[] input) throws IOException {
-        DataBag bag = new DataBag(Datum.DataType.TUPLE);
+        DataBag bag = BagFactory.getInstance().newDefaultBag();
         for(int i = 0; i < input.length; i++) {
             Tuple f = new Tuple(1);
             f.setField(0, input[i]);
@@ -54,7 +53,7 @@
 
     static public Tuple loadNestTuple(Tuple t, int[][] input) throws IOException {
         for (int i = 0; i < input.length; i++) {
-            DataBag bag = new DataBag(Datum.DataType.TUPLE);
+            DataBag bag = BagFactory.getInstance().newDefaultBag();
             Tuple f = loadFlatTuple(new Tuple(input[i].length), input[i]);
             bag.add(f);
             t.setField(i, bag);
@@ -64,7 +63,7 @@
 
     static public Tuple loadTuple(Tuple t, String[][] input) throws IOException {
         for (int i = 0; i < input.length; i++) {
-            DataBag bag = new DataBag(Datum.DataType.TUPLE);
+            DataBag bag = BagFactory.getInstance().newDefaultBag();
             Tuple f = loadTuple(new Tuple(input[i].length), input[i]);
             bag.add(f);
             t.setField(i, bag);