You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ut...@apache.org on 2007/12/08 01:55:30 UTC
svn commit: r602287 - in /incubator/pig/trunk: CHANGES.txt
src/org/apache/pig/data/BigDataBag.java
src/org/apache/pig/impl/io/DataBagFileWriter.java
test/org/apache/pig/test/TestDataModel.java
Author: utkarsh
Date: Fri Dec 7 16:55:29 2007
New Revision: 602287
URL: http://svn.apache.org/viewvc?rev=602287&view=rev
Log:
PIG-44: Added adaptive decision of the number of records to hold in memory
before spilling
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java
incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri Dec 7 16:55:29 2007
@@ -50,3 +50,6 @@
PIG-47: Added methods to DataMap to provide access to its content
PIG-12: Added time stamps to log4j messages (phunt via gates).
+
+ PIG-44: Added adaptive decision of the number of records to hold in memory
+ before spilling (utkarsh)
Modified: incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java Fri Dec 7 16:55:29 2007
@@ -40,7 +40,9 @@
boolean finishedAdds = false,wantSorting = false, doneSorting = false, sortInProgress = false, wroteUnsortedFile = false;
int trueCount = 0;
-
+ int numRecordsToHoldInMemory = 1000;
+
+
boolean eliminateDuplicates = false;
EvalSpec spec = null;
@@ -50,7 +52,10 @@
* cause us to switch to disk backed mode
*/
public static long FREE_MEMORY_TO_MAINTAIN = (long)(MAX_MEMORY*.25);
-
+ /**
+ * want to hold roughly only 1% of max memory, once we are in a low memory condition
+ */
+ public static long TARGET_IN_MEMORY_SIZE = (long)(0.01 * MAX_MEMORY);
public BigDataBag(File tempdir) throws IOException {
this.tempdir = tempdir;
@@ -61,10 +66,16 @@
long usedMemory = Runtime.getRuntime().totalMemory() - freeMemory;
return MAX_MEMORY-usedMemory > memLimit;
}
-
+
+ private File getTempFile() throws IOException{
+ File store = File.createTempFile("bag",".dat",tempdir);
+ store.deleteOnExit();
+ return store;
+ }
+
private void writeContentToDisk() throws IOException{
if (writer==null){
- File store = File.createTempFile("bag",".dat",tempdir);
+ File store = getTempFile();
stores.add(store);
writer = new DataBagFileWriter(store);
}
@@ -76,7 +87,10 @@
}else{
wroteUnsortedFile = true;
}
- writer.write(content.iterator());
+ long bytesWritten = writer.write(content.iterator());
+ //Adjust the number of records to hold in memory so that what
+ //we hold in memory is about TARGET_IN_MEMORY_SIZE
+ numRecordsToHoldInMemory = (int)((numRecordsToHoldInMemory * TARGET_IN_MEMORY_SIZE) / bytesWritten);
super.clear();
if (wantSorting){
writer.close();
@@ -95,7 +109,7 @@
if (writer == null) {
//Want to add in memory
super.add(t);
- if (!isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN) && trueCount > 10) {
+ if (!isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN) && content.size() > numRecordsToHoldInMemory) {
writeContentToDisk();
}
}else{
@@ -133,7 +147,7 @@
Iterator<Tuple> iter = reader.content();
while(iter.hasNext()){
DataBag bag = new DataBag();
- while( iter.hasNext() && isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN/2)){
+ while( iter.hasNext() && (isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN/2) || bag.cardinality() < numRecordsToHoldInMemory)){
bag.add(iter.next());
}
if(eliminateDuplicates){
@@ -141,7 +155,7 @@
trueCount = bag.cardinality();
}else
bag.sort(spec);
- File f = File.createTempFile("bag", ".dat",tempdir);
+ File f = getTempFile();
stores.add(f);
DataBagFileWriter writer = new DataBagFileWriter(f);
writer.write(bag.content());
@@ -306,7 +320,7 @@
}
}
- File outputFile = File.createTempFile("bag",".dat",tempdir);
+ File outputFile = getTempFile();
stores.add(outputFile);
writer = new DataBagFileWriter(outputFile);
Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileWriter.java Fri Dec 7 16:55:29 2007
@@ -41,10 +41,20 @@
t.write(out);
}
- public void write(Iterator<Tuple> iter) throws IOException{
+ public long write(Iterator<Tuple> iter) throws IOException{
+
+ long initialSize = getFileLength();
while (iter.hasNext())
- iter.next().write(out);
+ iter.next().write(out);
+
+ return getFileLength() - initialSize;
}
+
+ public long getFileLength() throws IOException{
+ out.flush();
+ return store.length();
+ }
+
public void close() throws IOException{
flush();
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java?rev=602287&r1=602286&r2=602287&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Fri Dec 7 16:55:29 2007
@@ -19,7 +19,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
@@ -216,176 +215,78 @@
public void testBigDataBagOnDisk() throws Exception{
Runtime.getRuntime().gc();
- testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 10000);
+ testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 1000000);
}
+ private enum TestType {
+ PRE_SORT,
+ POST_SORT,
+ PRE_DISTINCT,
+ POST_DISTINCT,
+ NONE
+ }
+
private void testBigDataBag(long freeMemoryToMaintain, int numItems) throws Exception {
BigDataBag.FREE_MEMORY_TO_MAINTAIN = freeMemoryToMaintain;
- File tmp = File.createTempFile("test", "bag").getParentFile();
- BigDataBag bag = new BigDataBag(tmp);
- Iterator<Tuple> it;
- int count;
- String last;
-
Random r = new Random();
-
-
- //Basic test
- assertTrue(bag.isEmpty());
-
- for(int i = 0; i < numItems; i++) {
- Tuple t = new Tuple(2);
- t.setField(0, Integer.toHexString(i));
- t.setField(1, i);
- bag.add(t);
- }
-
- assertFalse(bag.isEmpty());
-
- assertTrue(bag.cardinality() == numItems);
-
- int lastI = -1;
- it = bag.content();
- count = 0;
- while(it.hasNext()) {
- Tuple t = it.next();
- int ix = Integer.parseInt(t.getAtomField(0).strval(), 16);
- assertTrue(Integer.toString(ix).equals(t.getAtomField(1).strval()));
- assertEquals(lastI+1, ix);
- lastI = ix;
- count++;
- }
-
- assertTrue(bag.cardinality() == count);
-
- bag.distinct();
-
- bag.clear();
+
+ for (TestType testType: TestType.values()){
+ BigDataBag bag = BagFactory.getInstance().getNewBigBag();
+
+ assertTrue(bag.isEmpty());
+
+ if (testType == TestType.PRE_SORT)
+ bag.sort();
+ else if (testType == TestType.PRE_DISTINCT)
+ bag.distinct();
+
+ //generate data and add it to the bag
+ for(int i = 0; i < numItems; i++) {
+ Tuple t = new Tuple(1);
+ t.setField(0, r.nextInt(numItems));
+ bag.add(t);
+ }
+
+ assertFalse(bag.isEmpty());
+
+ if (testType == TestType.POST_SORT)
+ bag.sort();
+ else if (testType == TestType.POST_DISTINCT)
+ bag.distinct();
+
+
+ if (testType == TestType.NONE)
+ assertTrue(bag.cardinality() == numItems);
+ checkContents(bag, numItems, testType);
+ checkContents(bag, numItems, testType);
- //Test pre sort
-
- bag.sort();
-
-
- for(int i = 0; i < numItems; i++) {
- Tuple t = new Tuple(1);
- t.setField(0, r.nextInt(100000));
- bag.add(t);
- }
-
- it = bag.content();
- count = 0;
- last= "";
- while(it.hasNext()) {
- Tuple t = it.next();
- String next = t.getAtomField(0).strval();
- assertTrue(last.compareTo(next)<=0);
- last = next;
- count++;
- }
-
- assertTrue(bag.cardinality() == count);
-
- bag.clear();
-
-
- //Test post sort
-
- for(int i = 0; i < numItems; i++) {
- Tuple t = new Tuple(1);
- t.setField(0, r.nextInt(100000));
- bag.add(t);
- }
+ }
+ }
+
+
+ private void checkContents(DataBag bag, int numItems, TestType testType) throws Exception{
+ String last = "";
- bag.sort();
DataBag.notifyInterval = 100;
- it = bag.content();
- count = 0;
- last= "";
- while(it.hasNext()) {
- Tuple t = it.next();
- String next = t.getAtomField(0).strval();
- assertTrue(last.compareTo(next)<=0);
- last = next;
- count++;
- }
-
- assertTrue(bag.cardinality() == count);
- int cnt = numItems/DataBag.notifyInterval;
- assertTrue(bag.numNotifies >= cnt);
-
- bag.clear();
-
- //test post-distinct
-
-
- for(int i = 0; i < numItems; i++) {
- Tuple t = new Tuple(1);
- //To get a lot of duplicates
- t.setField(0, r.nextInt(1000));
- bag.add(t);
- }
-
- bag.distinct();
-
- it = bag.content();
- count = 0;
- last= "";
+ Iterator<Tuple> it = bag.content();
+ int count = 0;
while(it.hasNext()) {
- Tuple t = it.next();
- String next = t.getAtomField(0).strval();
- assertTrue(last.compareTo(next)<0);
+ Tuple t = it.next();
+ String next = t.getAtomField(0).strval();
+ if (testType == TestType.POST_SORT || testType == TestType.PRE_SORT)
+ assertTrue(last.compareTo(next)<=0);
+ else if (testType == TestType.POST_DISTINCT || testType == TestType.PRE_DISTINCT)
+ assertTrue(last.compareTo(next)<0);
last = next;
- count++;
- }
-
- assertTrue(bag.cardinality() == count);
-
- bag.clear();
-
-
- //Test pre distinct
-
- bag.distinct();
-
-
- for(int i = 0; i < numItems; i++) {
- Tuple t = new Tuple(1);
- //To get a lot of duplicates
- t.setField(0, r.nextInt(10));
- bag.add(t);
+ count++;
}
-
- it = bag.content();
- count = 0;
- last= "";
- while(it.hasNext()) {
- Tuple t = it.next();
- String next = t.getAtomField(0).strval();
- assertTrue(last.compareTo(next)<0);
- last = next;
- count++;
- }
-
- assertTrue(bag.cardinality() == count);
-
- //Check if it gives the correct contents the second time around
- it = bag.content();
- count = 0;
- last= "";
- while(it.hasNext()) {
- Tuple t = it.next();
- String next = t.getAtomField(0).strval();
- assertTrue(last.compareTo(next)<0);
- last = next;
- count++;
- }
-
assertTrue(bag.cardinality() == count);
- bag.clear();
+ if (testType != TestType.NONE)
+ assertTrue(bag.numNotifies >= count/DataBag.notifyInterval);
}
+
}