You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2007/11/29 02:40:54 UTC
svn commit: r599242 - in /incubator/pig/trunk: CHANGES.txt
src/org/apache/pig/data/BigDataBag.java src/org/apache/pig/data/DataBag.java
src/org/apache/pig/impl/io/DataBagFileReader.java
test/org/apache/pig/test/TestDataModel.java
Author: olga
Date: Wed Nov 28 17:40:53 2007
New Revision: 599242
URL: http://svn.apache.org/viewvc?rev=599242&view=rev
Log:
PIG-14: added heartbeat functionality
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java
incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Wed Nov 28 17:40:53 2007
@@ -29,3 +29,5 @@
PIG-33 Help was commented out - uncommented (olgan)
PIG-31: second half of concurrent mode problem addressed (olgan)
+
+ PIG-14: added heartbeat functionality (olgan)
Modified: incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/BigDataBag.java Wed Nov 28 17:40:53 2007
@@ -29,6 +29,7 @@
import org.apache.pig.impl.eval.StarSpec;
import org.apache.pig.impl.io.DataBagFileReader;
import org.apache.pig.impl.io.DataBagFileWriter;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
public class BigDataBag extends DataBag {
@@ -62,7 +63,6 @@
}
private void writeContentToDisk() throws IOException{
-
if (writer==null){
File store = File.createTempFile("bag",".dat",tempdir);
stores.add(store);
@@ -82,7 +82,6 @@
writer.close();
writer = null;
}
-
}
@Override
@@ -177,7 +176,6 @@
@Override
public Iterator<Tuple> content() {
-
if (sortInProgress)
throw new RuntimeException("Cannot open another iterator: a sort is in progress");
@@ -272,11 +270,12 @@
private class FileMerger implements Iterator<Tuple>{
PriorityQueue<HeapEntry> heap;
private final int FANIN_LIMIT = 25;
+ int curCall;
DataBagFileWriter writer;
HeapEntry nextEntry;
public FileMerger() throws IOException{
-
+ numNotifies = 0;
Comparator<HeapEntry> comp = new Comparator<HeapEntry>(){
public int compare(HeapEntry he1, HeapEntry he2){
try
@@ -317,6 +316,14 @@
}
private void getNextEntry() throws IOException{
+ if (curCall < notifyInterval - 1)
+ curCall ++;
+ else{
+ if (PigMapReduce.reporter != null)
+ PigMapReduce.reporter.progress();
+ curCall = 0;
+ numNotifies ++;
+ }
if (heap.isEmpty()){
nextEntry = null;
writer.close();
Modified: incubator/pig/trunk/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataBag.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataBag.java Wed Nov 28 17:40:53 2007
@@ -27,6 +27,7 @@
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
/**
@@ -114,9 +115,18 @@
Collections.sort(content);
isSorted = true;
+ int curCall = 0;
Tuple lastTup = null;
for (Iterator<Tuple> it = content.iterator(); it.hasNext(); ) {
+ if (curCall < notifyInterval - 1)
+ curCall++;
+ else
+ {
+ if (PigMapReduce.reporter != null)
+ PigMapReduce.reporter.progress();
+ curCall = 0;
+ }
Tuple thisTup = it.next();
if (lastTup == null) {
@@ -132,8 +142,37 @@
}
}
+ public static int notifyInterval = 1000;
+ public int numNotifies; // used for unit tests only
+
public Iterator<Tuple> content() {
- return content.iterator();
+ return new Iterator<Tuple>() {
+ Iterator<Tuple> myIt;
+ int curCall;
+
+ {
+ numNotifies = 0;
+ myIt = content.iterator();
+
+ }
+ public final boolean hasNext(){
+ return myIt.hasNext();
+ }
+ public final Tuple next(){
+ if (curCall < notifyInterval - 1)
+ curCall ++;
+ else{
+ if (PigMapReduce.reporter != null)
+ PigMapReduce.reporter.progress();
+ numNotifies ++;
+ curCall = 0;
+ }
+ return myIt.next();
+ }
+ public final void remove(){
+ myIt.remove();
+ }
+ };
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/DataBagFileReader.java Wed Nov 28 17:40:53 2007
@@ -26,6 +26,7 @@
import java.util.Iterator;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.mapreduceExec.PigMapReduce;
public class DataBagFileReader {
@@ -35,16 +36,29 @@
store = f;
}
+ public static int notifyInterval = 1000;
+ public int numNotifies;
private class myIterator implements Iterator<Tuple>{
DataInputStream in;
Tuple nextTuple;
+ int curCall;
public myIterator() throws IOException{
+ numNotifies = 0;
in = new DataInputStream(new BufferedInputStream(new FileInputStream(store)));
getNextTuple();
}
private void getNextTuple() throws IOException{
+ if (curCall < notifyInterval - 1)
+ curCall ++;
+ else{
+ if (PigMapReduce.reporter != null)
+ PigMapReduce.reporter.progress();
+ curCall = 0;
+ numNotifies ++;
+ }
+
try{
nextTuple = new Tuple();
nextTuple.readFields(in);
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java?rev=599242&r1=599241&r2=599242&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestDataModel.java Wed Nov 28 17:40:53 2007
@@ -194,6 +194,18 @@
caught = true;
}
assertTrue(caught);
+
+ // check that notifications are sent
+ b.clear();
+ DataBag.notifyInterval = 2;
+ Tuple g = Util.loadFlatTuple(new Tuple(input1.length), input1);
+ for (int i = 0; i < 10; i++) {
+ b.add(g);
+ }
+
+ Iterator it = b.content();
+ while (it.hasNext()) it.next();
+ assert(b.numNotifies == 5);
}
@Test
@@ -287,7 +299,7 @@
}
bag.sort();
-
+ DataBag.notifyInterval = 100;
it = bag.content();
count = 0;
last= "";
@@ -300,6 +312,8 @@
}
assertTrue(bag.cardinality() == count);
+ int cnt = numItems/DataBag.notifyInterval;
+ assertTrue(bag.numNotifies >= cnt);
bag.clear();