You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2007/11/29 02:40:54 UTC

svn commit: r599242 - in /incubator/pig/trunk: CHANGES.txt src/org/apache/pig/data/BigDataBag.java src/org/apache/pig/data/DataBag.java src/org/apache/pig/impl/io/DataBagFileReader.java test/org/apache/pig/test/TestDataModel.java

Author: olga
Date: Wed Nov 28 17:40:53 2007
New Revision: 599242

URL: http://svn.apache.org/viewvc?rev=599242&view=rev
Log:
PIG-14: added heartbeat functionality

Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
    incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
    incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java
    incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Wed Nov 28 17:40:53 2007
@@ -29,3 +29,5 @@
 	PIG-33 Help was commented out - uncommented (olgan)
 
 	PIG-31: second half of concurrent mode problem addressed (olgan)
+
+	PIG-14: added heartbeat functionality (olgan)

Modified: incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java Wed Nov 28 17:40:53 2007
@@ -29,6 +29,7 @@
 import org.apache.pig.impl.eval.StarSpec;
 import org.apache.pig.impl.io.DataBagFileReader;
 import org.apache.pig.impl.io.DataBagFileWriter;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 
 
 public class BigDataBag extends DataBag {
@@ -62,7 +63,6 @@
     }
         
     private void writeContentToDisk() throws IOException{
-    	
     	if (writer==null){
 			File store = File.createTempFile("bag",".dat",tempdir);
 			stores.add(store);
@@ -82,7 +82,6 @@
         	writer.close();
         	writer = null;
         }
-        
     }
     
     @Override
@@ -177,7 +176,6 @@
     
     @Override
 	public Iterator<Tuple> content() {
-    	
     	if (sortInProgress)
     		throw new RuntimeException("Cannot open another iterator: a sort is in progress");
     	
@@ -272,11 +270,12 @@
     private class FileMerger implements Iterator<Tuple>{
     	PriorityQueue<HeapEntry> heap;
     	private final int FANIN_LIMIT = 25;
+        int curCall;
     	DataBagFileWriter writer;
     	HeapEntry nextEntry;
     	
     	public FileMerger() throws IOException{
-        	
+            numNotifies = 0;
     		Comparator<HeapEntry> comp = new Comparator<HeapEntry>(){
         		public int compare(HeapEntry he1, HeapEntry he2){
 				try
@@ -317,6 +316,14 @@
     	}
     	
     	private void getNextEntry() throws IOException{
+            if (curCall < notifyInterval - 1)
+                curCall ++;
+            else{
+                if (PigMapReduce.reporter != null)
+                    PigMapReduce.reporter.progress();
+                curCall = 0;
+                numNotifies ++;
+            }
     		if (heap.isEmpty()){
         		nextEntry = null;
         		writer.close();

Modified: incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataBag.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataBag.java Wed Nov 28 17:40:53 2007
@@ -27,6 +27,7 @@
 
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 
 
 /**
@@ -114,9 +115,18 @@
         
         Collections.sort(content);
         isSorted = true;
+        int curCall = 0;
         
         Tuple lastTup = null;
         for (Iterator<Tuple> it = content.iterator(); it.hasNext(); ) {
+            if (curCall < notifyInterval - 1)
+                curCall++;
+            else
+            {
+                    if (PigMapReduce.reporter != null)
+                        PigMapReduce.reporter.progress(); 
+                    curCall = 0;
+            }
             Tuple thisTup = it.next();
             
             if (lastTup == null) {
@@ -132,8 +142,37 @@
         }
     }
 
+    public static int notifyInterval = 1000;
+    public int numNotifies; // used for unit tests only
+
     public Iterator<Tuple> content() {
-        return content.iterator();
+        return new Iterator<Tuple>() {
+             Iterator<Tuple> myIt;
+             int curCall;
+
+            {
+                numNotifies = 0;
+                myIt = content.iterator();
+
+            }
+            public final boolean hasNext(){
+                return myIt.hasNext();
+            }
+            public final Tuple next(){
+                if (curCall < notifyInterval - 1)
+                    curCall ++;
+                else{
+                    if (PigMapReduce.reporter != null)
+                        PigMapReduce.reporter.progress(); 
+                    numNotifies ++;
+                    curCall = 0;
+                } 
+                return myIt.next();
+            }
+            public final void remove(){
+                myIt.remove();
+            }
+        };
     }
     
 

Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java Wed Nov 28 17:40:53 2007
@@ -26,6 +26,7 @@
 import java.util.Iterator;
 
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 
 
 public class DataBagFileReader {
@@ -35,16 +36,29 @@
 		store = f;
 	}
 	
+    public static int notifyInterval = 1000;
+    public int numNotifies;
 	private class myIterator implements Iterator<Tuple>{
 		DataInputStream in;
 		Tuple nextTuple;
+        int curCall;
 		
 		public myIterator() throws IOException{
+            numNotifies = 0;
 			in = new DataInputStream(new BufferedInputStream(new FileInputStream(store)));
 			getNextTuple();
 		}
 		
 		private void getNextTuple() throws IOException{
+            if (curCall < notifyInterval - 1)
+                curCall ++;
+            else{
+                if (PigMapReduce.reporter != null)
+                    PigMapReduce.reporter.progress();
+                curCall = 0;
+                numNotifies ++;
+            }
+
 			try{
 				nextTuple = new Tuple();
 		        nextTuple.readFields(in);

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Wed Nov 28 17:40:53 2007
@@ -194,6 +194,18 @@
             caught = true;
         }   
         assertTrue(caught);
+
+        // check that notifications are sent
+         b.clear();
+         DataBag.notifyInterval = 2;
+         Tuple g = Util.loadFlatTuple(new Tuple(input1.length), input1);
+         for (int i = 0; i < 10; i++) {
+             b.add(g);
+         }
+
+         Iterator it = b.content();
+         while (it.hasNext()) it.next();
+         assert(b.numNotifies == 5);
     }
 
     @Test
@@ -287,7 +299,7 @@
         }
         
         bag.sort();
-
+        DataBag.notifyInterval = 100;
         it = bag.content();
         count = 0;
         last= "";
@@ -300,6 +312,8 @@
         }
 
         assertTrue(bag.cardinality() == count);
+        int cnt = numItems/DataBag.notifyInterval;
+        assertTrue(bag.numNotifies >= cnt);
         
         bag.clear();