You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ut...@apache.org on 2007/12/08 01:55:30 UTC

svn commit: r602287 - in /incubator/pig/trunk: CHANGES.txt src/org/apache/pig/data/BigDataBag.java src/org/apache/pig/impl/io/DataBagFileWriter.java test/org/apache/pig/test/TestDataModel.java

Author: utkarsh
Date: Fri Dec  7 16:55:29 2007
New Revision: 602287

URL: http://svn.apache.org/viewvc?rev=602287&view=rev
Log:
PIG-44: Added adaptive decision of the number of records to hold in memory 
        before spilling

Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
    incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java
    incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri Dec  7 16:55:29 2007
@@ -50,3 +50,6 @@
     PIG-47: Added methods to DataMap to provide access to its content
 
 	PIG-12: Added time stamps to log4j messages (phunt via gates).
+
+	PIG-44: Added adaptive decision of the number of records to hold in memory 
+	before spilling (utkarsh)

Modified: incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java Fri Dec  7 16:55:29 2007
@@ -40,7 +40,9 @@
     
     boolean finishedAdds = false,wantSorting = false, doneSorting = false, sortInProgress = false, wroteUnsortedFile = false;
     int trueCount = 0;
-
+    int numRecordsToHoldInMemory = 1000;
+    
+    
     boolean eliminateDuplicates = false;
     EvalSpec spec = null;
     
@@ -50,7 +52,10 @@
      * cause us to switch to disk backed mode
      */
     public static long FREE_MEMORY_TO_MAINTAIN = (long)(MAX_MEMORY*.25);
-    
+    /**
+     * want to hold roughly only 1% of max memory, once we are in a low memory condition
+     */
+    public static long TARGET_IN_MEMORY_SIZE = (long)(0.01 * MAX_MEMORY);
     
     public BigDataBag(File tempdir) throws IOException {
     	this.tempdir = tempdir;
@@ -61,10 +66,16 @@
 	long usedMemory = Runtime.getRuntime().totalMemory() - freeMemory;
     	return MAX_MEMORY-usedMemory > memLimit;
     }
-        
+    
+    private File getTempFile() throws IOException{
+		File store = File.createTempFile("bag",".dat",tempdir);
+		store.deleteOnExit();
+		return store;
+    }
+    
     private void writeContentToDisk() throws IOException{
     	if (writer==null){
-			File store = File.createTempFile("bag",".dat",tempdir);
+			File store = getTempFile();
 			stores.add(store);
 			writer = new DataBagFileWriter(store);
 		}
@@ -76,7 +87,10 @@
     	}else{
     		wroteUnsortedFile = true;
     	}
-    	writer.write(content.iterator());
+    	long bytesWritten = writer.write(content.iterator());
+    	//Adjust the number of records to hold in memory so that what 
+    	//we hold in memory is about TARGET_IN_MEMORY_SIZE
+    	numRecordsToHoldInMemory = (int)((numRecordsToHoldInMemory * TARGET_IN_MEMORY_SIZE) / bytesWritten);
     	super.clear();
     	if (wantSorting){
         	writer.close();
@@ -95,7 +109,7 @@
 	        if (writer == null) {
 	        	//Want to add in memory
 	        	super.add(t);
-	            if (!isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN) && trueCount > 10) {
+	            if (!isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN) && content.size() > numRecordsToHoldInMemory) {
 	            	writeContentToDisk();
 	            }	
 	        }else{
@@ -133,7 +147,7 @@
     	Iterator<Tuple> iter = reader.content();
     	while(iter.hasNext()){
     		DataBag bag = new DataBag();
-    		while( iter.hasNext() && isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN/2)){
+    		while( iter.hasNext() && (isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN/2) || bag.cardinality() < numRecordsToHoldInMemory)){
     			bag.add(iter.next());
     		}
     		if(eliminateDuplicates){
@@ -141,7 +155,7 @@
     			trueCount = bag.cardinality();
     		}else
     			bag.sort(spec);
-    		File f = File.createTempFile("bag", ".dat",tempdir);
+    		File f = getTempFile();
     		stores.add(f);
     		DataBagFileWriter writer = new DataBagFileWriter(f);
     		writer.write(bag.content());
@@ -306,7 +320,7 @@
         		}
         	}
         	
-        	File outputFile  = File.createTempFile("bag",".dat",tempdir);
+        	File outputFile  = getTempFile();
         	stores.add(outputFile);
         	writer = new DataBagFileWriter(outputFile);
         	

Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java Fri Dec  7 16:55:29 2007
@@ -41,10 +41,20 @@
 		t.write(out);
 	}
 	
-	public void write(Iterator<Tuple> iter) throws IOException{
+	public long write(Iterator<Tuple> iter) throws IOException{
+	
+		long initialSize = getFileLength();
 		while (iter.hasNext())
-			iter.next().write(out);
+			iter.next().write(out);
+		
+		return getFileLength() - initialSize;
 	}
+	
+	public long getFileLength() throws IOException{
+		out.flush();
+		return store.length();
+	}
+	
 	
 	public void close() throws IOException{
 		flush();

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Fri Dec  7 16:55:29 2007
@@ -19,7 +19,6 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
@@ -216,176 +215,78 @@
 
     public void testBigDataBagOnDisk() throws Exception{
     	Runtime.getRuntime().gc();
-    	testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 10000);
+    	testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 1000000);
     }
 
+    private enum TestType {
+    	PRE_SORT,
+    	POST_SORT,
+    	PRE_DISTINCT,
+    	POST_DISTINCT,
+    	NONE
+    }
+       
     
     private void testBigDataBag(long freeMemoryToMaintain, int numItems) throws Exception {
     	BigDataBag.FREE_MEMORY_TO_MAINTAIN = freeMemoryToMaintain;
-    	File tmp = File.createTempFile("test", "bag").getParentFile();
-        BigDataBag bag = new BigDataBag(tmp);
-        Iterator<Tuple> it;
-        int count;
-        String last;
-    
         Random r = new Random();
-        
-        
-        //Basic test
-        assertTrue(bag.isEmpty());
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(2);
-            t.setField(0, Integer.toHexString(i));
-            t.setField(1, i);
-            bag.add(t);
-        }
-        
-        assertFalse(bag.isEmpty());
-
-        assertTrue(bag.cardinality() == numItems);
-        
-        int lastI = -1;
-        it = bag.content();
-        count = 0;
-        while(it.hasNext()) {
-            Tuple t = it.next();
-            int ix = Integer.parseInt(t.getAtomField(0).strval(), 16);
-            assertTrue(Integer.toString(ix).equals(t.getAtomField(1).strval()));
-            assertEquals(lastI+1, ix);
-            lastI = ix;
-            count++;
-        }
-        
-        assertTrue(bag.cardinality() == count);
-        
-        bag.distinct();
-
-        bag.clear();
+   
+    	for (TestType testType: TestType.values()){
+    		BigDataBag bag = BagFactory.getInstance().getNewBigBag();
+
+            assertTrue(bag.isEmpty());
+
+            if (testType == TestType.PRE_SORT)
+            	bag.sort();
+            else if (testType == TestType.PRE_DISTINCT)
+            	bag.distinct();
+            
+            //generate data and add it to the bag
+            for(int i = 0; i < numItems; i++) {
+                Tuple t = new Tuple(1);
+                t.setField(0, r.nextInt(numItems));
+                bag.add(t);
+            }
+
+            assertFalse(bag.isEmpty());
+
+            if (testType == TestType.POST_SORT)
+            	bag.sort();
+            else if (testType == TestType.POST_DISTINCT)
+            	bag.distinct();
+
+            
+            if (testType == TestType.NONE)
+            	assertTrue(bag.cardinality() == numItems);
+            checkContents(bag, numItems, testType);
+            checkContents(bag, numItems, testType);
 
-        //Test pre sort
-        
-        bag.sort();
-        
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            t.setField(0, r.nextInt(100000));
-            bag.add(t);
-        }
-        
-        it = bag.content();
-        count = 0;
-        last= "";
-        while(it.hasNext()) {
-            Tuple t = it.next();
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<=0);
-            last = next;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-
-
-        //Test post sort
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            t.setField(0, r.nextInt(100000));
-            bag.add(t);
-        }
+    	}
+    }
+     
+    
+    private void checkContents(DataBag bag, int numItems, TestType testType) throws Exception{
+        String last = "";
         
-        bag.sort();
         DataBag.notifyInterval = 100;
-        it = bag.content();
-        count = 0;
-        last= "";
-        while(it.hasNext()) {
-            Tuple t = it.next();
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<=0);
-            last = next;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        int cnt = numItems/DataBag.notifyInterval;
-        assertTrue(bag.numNotifies >= cnt);
-        
-        bag.clear();
-		
-        //test post-distinct
-        
-       
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            //To get a lot of duplicates
-            t.setField(0, r.nextInt(1000));
-            bag.add(t);
-        }
         
-        
-        bag.distinct();
-
-        it = bag.content();
-        count = 0;
-        last= "";
+        Iterator<Tuple> it = bag.content();
+        int count = 0;
         while(it.hasNext()) {
-            Tuple t = it.next();
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
+        	Tuple t = it.next();
+        	String next = t.getAtomField(0).strval();
+        	if (testType == TestType.POST_SORT || testType == TestType.PRE_SORT)
+                assertTrue(last.compareTo(next)<=0);
+        	else if (testType == TestType.POST_DISTINCT || testType == TestType.PRE_DISTINCT)
+                assertTrue(last.compareTo(next)<0);
             last = next;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-		
-        
-        //Test pre distinct
-
-        bag.distinct();
-
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            //To get a lot of duplicates
-            t.setField(0, r.nextInt(10));
-            bag.add(t);
+        	count++;
         }
         
-
-        it = bag.content();
-        count = 0;
-        last= "";
-        while(it.hasNext()) {
-            Tuple t = it.next();
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
-            last = next;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-
-        //Check if it gives the correct contents the second time around
-        it = bag.content();
-        count = 0;
-        last= "";
-        while(it.hasNext()) {
-            Tuple t = it.next();
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
-            last = next;
-            count++;
-        }
-
         assertTrue(bag.cardinality() == count);
         
-        bag.clear();
+        if (testType != TestType.NONE)
+        	assertTrue(bag.numNotifies >= count/DataBag.notifyInterval);
     }
+
 }